154 lines
5.1 KiB
Go
154 lines
5.1 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package comm
|
|
|
|
import (
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
|
|
proto "github.com/hyperledger/fabric-protos-go/gossip"
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
"github.com/hyperledger/fabric/gossip/protoext"
|
|
"github.com/hyperledger/fabric/gossip/util"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestInterceptAcks(t *testing.T) {
|
|
pubsub := util.NewPubSub()
|
|
pkiID := common.PKIidType("pkiID")
|
|
msgs := make(chan *protoext.SignedGossipMessage, 1)
|
|
handlerFunc := func(message *protoext.SignedGossipMessage) {
|
|
msgs <- message
|
|
}
|
|
wrappedHandler := interceptAcks(handlerFunc, pkiID, pubsub)
|
|
ack := &protoext.SignedGossipMessage{
|
|
GossipMessage: &proto.GossipMessage{
|
|
Nonce: 1,
|
|
Content: &proto.GossipMessage_Ack{
|
|
Ack: &proto.Acknowledgement{},
|
|
},
|
|
},
|
|
}
|
|
sub := pubsub.Subscribe(topicForAck(1, pkiID), time.Second)
|
|
wrappedHandler(ack)
|
|
// Ensure ack was consumed and not passed onwards to the wrapped hander
|
|
require.Len(t, msgs, 0)
|
|
_, err := sub.Listen()
|
|
// Ensure ack was published
|
|
require.NoError(t, err)
|
|
|
|
// Test none acks are just forwarded
|
|
notAck := &protoext.SignedGossipMessage{
|
|
GossipMessage: &proto.GossipMessage{
|
|
Nonce: 2,
|
|
Content: &proto.GossipMessage_DataMsg{
|
|
DataMsg: &proto.DataMessage{},
|
|
},
|
|
},
|
|
}
|
|
sub = pubsub.Subscribe(topicForAck(2, pkiID), time.Second)
|
|
wrappedHandler(notAck)
|
|
// Ensure message was passed to the wrapped handler
|
|
require.Len(t, msgs, 1)
|
|
_, err = sub.Listen()
|
|
// Ensure ack was not published
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestAck(t *testing.T) {
|
|
comm1, _ := newCommInstance(t, naiveSec)
|
|
comm2, port2 := newCommInstance(t, naiveSec)
|
|
defer comm2.Stop()
|
|
comm3, port3 := newCommInstance(t, naiveSec)
|
|
defer comm3.Stop()
|
|
comm4, port4 := newCommInstance(t, naiveSec)
|
|
defer comm4.Stop()
|
|
|
|
acceptData := func(o interface{}) bool {
|
|
m := o.(protoext.ReceivedMessage).GetGossipMessage()
|
|
return protoext.IsDataMsg(m.GossipMessage)
|
|
}
|
|
|
|
ack := func(c <-chan protoext.ReceivedMessage) {
|
|
msg := <-c
|
|
msg.Ack(nil)
|
|
}
|
|
|
|
nack := func(c <-chan protoext.ReceivedMessage) {
|
|
msg := <-c
|
|
msg.Ack(errors.New("Failed processing message because reasons"))
|
|
}
|
|
|
|
// Have instances 2 and 3 subscribe to data messages, and ack them
|
|
inc2 := comm2.Accept(acceptData)
|
|
inc3 := comm3.Accept(acceptData)
|
|
|
|
// Collect 2 out of 2 acks - should succeed
|
|
go ack(inc2)
|
|
go ack(inc3)
|
|
res := comm1.SendWithAck(createGossipMsg(), time.Second*3, 2, remotePeer(port2), remotePeer(port3))
|
|
require.Len(t, res, 2)
|
|
require.Empty(t, res[0].Error())
|
|
require.Empty(t, res[1].Error())
|
|
|
|
// Collect 2 out of 3 acks - should succeed
|
|
t1 := time.Now()
|
|
go ack(inc2)
|
|
go ack(inc3)
|
|
res = comm1.SendWithAck(createGossipMsg(), time.Second*10, 2, remotePeer(port2), remotePeer(port3), remotePeer(port4))
|
|
elapsed := time.Since(t1)
|
|
require.Len(t, res, 2)
|
|
require.Empty(t, res[0].Error())
|
|
require.Empty(t, res[1].Error())
|
|
// Collection of 2 out of 3 acks should have taken much less than the timeout (10 seconds)
|
|
require.True(t, elapsed < time.Second*5)
|
|
|
|
// Collect 2 out of 3 acks - should fail, because peer3 now have sent an error along with the ack
|
|
go ack(inc2)
|
|
go nack(inc3)
|
|
res = comm1.SendWithAck(createGossipMsg(), time.Second*10, 2, remotePeer(port2), remotePeer(port3), remotePeer(port4))
|
|
require.Len(t, res, 3)
|
|
require.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "Failed processing message because reasons")
|
|
require.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "timed out")
|
|
|
|
// Collect 2 out of 2 acks - should fail because comm2 and comm3 now don't acknowledge messages
|
|
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 2, remotePeer(port2), remotePeer(port3))
|
|
require.Len(t, res, 2)
|
|
require.Contains(t, res[0].Error(), "timed out")
|
|
require.Contains(t, res[1].Error(), "timed out")
|
|
// Drain ack messages to prepare for next salvo
|
|
<-inc2
|
|
<-inc3
|
|
|
|
// Collect 2 out of 3 acks - should fail
|
|
go ack(inc2)
|
|
go nack(inc3)
|
|
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 2, remotePeer(port2), remotePeer(port3), remotePeer(port4))
|
|
require.Len(t, res, 3)
|
|
require.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "") // This is the "successful ack"
|
|
require.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "Failed processing message because reasons")
|
|
require.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "timed out")
|
|
require.Contains(t, res.String(), "\"Failed processing message because reasons\":1")
|
|
require.Contains(t, res.String(), "\"timed out\":1")
|
|
require.Contains(t, res.String(), "\"successes\":1")
|
|
require.Equal(t, 2, res.NackCount())
|
|
require.Equal(t, 1, res.AckCount())
|
|
|
|
// Send a message to no one
|
|
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 1)
|
|
require.Len(t, res, 0)
|
|
|
|
// Send a message while stopping
|
|
comm1.Stop()
|
|
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 1, remotePeer(port2), remotePeer(port3), remotePeer(port4))
|
|
require.Len(t, res, 3)
|
|
require.Contains(t, res[0].Error(), "comm is stopping")
|
|
require.Contains(t, res[1].Error(), "comm is stopping")
|
|
require.Contains(t, res[2].Error(), "comm is stopping")
|
|
}
|