go_study/fabric-main/orderer/common/cluster/comm_test.go

1549 lines
47 KiB
Go

/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster_test
import (
"context"
"crypto/rand"
"crypto/x509"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
comm_utils "github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/onsi/gomega"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
testChannel = "test"
testChannel2 = "test2"
timeout = time.Second * 10
)
var (
// CA that generates TLS key-pairs.
// We use only one CA because the authentication
// is based on TLS pinning
ca = createCAOrPanic()
lastNodeID uint64
testSubReq = &orderer.SubmitRequest{
Channel: "test",
}
testReq = &orderer.SubmitRequest{
Channel: "test",
Payload: &common.Envelope{
Payload: []byte("test"),
},
}
testReq2 = &orderer.SubmitRequest{
Channel: testChannel2,
Payload: &common.Envelope{
Payload: []byte(testChannel2),
},
}
testRes = &orderer.SubmitResponse{
Info: "test",
}
fooReq = wrapSubmitReq(&orderer.SubmitRequest{
Channel: "foo",
})
barReq = wrapSubmitReq(&orderer.SubmitRequest{
Channel: "bar",
})
testConsensusReq = &orderer.StepRequest{
Payload: &orderer.StepRequest_ConsensusRequest{
ConsensusRequest: &orderer.ConsensusRequest{
Payload: []byte{1, 2, 3},
Channel: testChannel,
},
},
}
channelExtractor = &mockChannelExtractor{}
)
func nextUnusedID() uint64 {
return atomic.AddUint64(&lastNodeID, 1)
}
func createCAOrPanic() tlsgen.CA {
ca, err := tlsgen.NewCA()
if err != nil {
panic(fmt.Sprintf("failed creating CA: %+v", err))
}
return ca
}
type mockChannelExtractor struct{}
func (*mockChannelExtractor) TargetChannel(msg proto.Message) string {
switch req := msg.(type) {
case *orderer.ConsensusRequest:
return req.Channel
case *orderer.SubmitRequest:
return req.Channel
default:
return ""
}
}
type clusterServer interface {
// Step passes an implementation-specific message to another cluster member.
Step(server orderer.Cluster_StepServer) error
}
type clusterNode struct {
lock sync.Mutex
frozen bool
freezeCond sync.Cond
dialer *cluster.PredicateDialer
handler *mocks.Handler
nodeInfo cluster.RemoteNode
srv *comm_utils.GRPCServer
bindAddress string
clientConfig comm_utils.ClientConfig
serverConfig comm_utils.ServerConfig
c *cluster.Comm
dispatcher clusterServer
}
func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
cn.waitIfFrozen()
req, err := stream.Recv()
if err != nil {
return err
}
if submitReq := req.GetSubmitRequest(); submitReq != nil {
return cn.c.DispatchSubmit(stream.Context(), submitReq)
}
if err := cn.c.DispatchConsensus(stream.Context(), req.GetConsensusRequest()); err != nil {
return err
}
return stream.Send(&orderer.StepResponse{})
}
func (cn *clusterNode) waitIfFrozen() {
cn.lock.Lock()
// There is no freeze after an unfreeze so no need
// for a for loop.
if cn.frozen {
cn.freezeCond.Wait()
return
}
cn.lock.Unlock()
}
func (cn *clusterNode) freeze() {
cn.lock.Lock()
defer cn.lock.Unlock()
cn.frozen = true
}
func (cn *clusterNode) unfreeze() {
cn.lock.Lock()
cn.frozen = false
cn.lock.Unlock()
cn.freezeCond.Broadcast()
}
func (cn *clusterNode) resurrect() {
gRPCServer, err := comm_utils.NewGRPCServer(cn.bindAddress, cn.serverConfig)
if err != nil {
panic(fmt.Errorf("failed starting gRPC server: %v", err))
}
cn.srv = gRPCServer
orderer.RegisterClusterServer(gRPCServer.Server(), cn.dispatcher)
go cn.srv.Start()
}
func (cn *clusterNode) stop() {
cn.srv.Stop()
cn.c.Shutdown()
}
func (cn *clusterNode) renewCertificates() {
clientKeyPair, err := ca.NewClientCertKeyPair()
if err != nil {
panic(fmt.Errorf("failed creating client certificate %v", err))
}
serverKeyPair, err := ca.NewServerCertKeyPair("127.0.0.1")
if err != nil {
panic(fmt.Errorf("failed creating server certificate %v", err))
}
cn.nodeInfo.ClientTLSCert = clientKeyPair.TLSCert.Raw
cn.nodeInfo.ServerTLSCert = serverKeyPair.TLSCert.Raw
cn.serverConfig.SecOpts.Certificate = serverKeyPair.Cert
cn.serverConfig.SecOpts.Key = serverKeyPair.Key
cn.dialer.Config.SecOpts.Key = clientKeyPair.Key
cn.dialer.Config.SecOpts.Certificate = clientKeyPair.Cert
}
func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsConnGauge metrics.Gauge) *clusterNode {
serverKeyPair, err := ca.NewServerCertKeyPair("127.0.0.1")
require.NoError(t, err)
clientKeyPair, _ := ca.NewClientCertKeyPair()
handler := &mocks.Handler{}
clientConfig := comm_utils.ClientConfig{
AsyncConnect: true,
DialTimeout: time.Hour,
SecOpts: comm_utils.SecureOptions{
RequireClientCert: true,
Key: clientKeyPair.Key,
Certificate: clientKeyPair.Cert,
ServerRootCAs: [][]byte{ca.CertBytes()},
UseTLS: true,
ClientRootCAs: [][]byte{ca.CertBytes()},
},
}
dialer := &cluster.PredicateDialer{
Config: clientConfig,
}
srvConfig := comm_utils.ServerConfig{
SecOpts: comm_utils.SecureOptions{
Key: serverKeyPair.Key,
Certificate: serverKeyPair.Cert,
UseTLS: true,
},
}
gRPCServer, err := comm_utils.NewGRPCServer("127.0.0.1:", srvConfig)
require.NoError(t, err)
tstSrv := &clusterNode{
dialer: dialer,
clientConfig: clientConfig,
serverConfig: srvConfig,
bindAddress: gRPCServer.Address(),
handler: handler,
nodeInfo: cluster.RemoteNode{
NodeAddress: cluster.NodeAddress{
Endpoint: gRPCServer.Address(),
ID: nextUnusedID(),
},
NodeCerts: cluster.NodeCerts{
ServerTLSCert: serverKeyPair.TLSCert.Raw,
ClientTLSCert: clientKeyPair.TLSCert.Raw,
},
},
srv: gRPCServer,
}
if tstSrv.dispatcher == nil {
tstSrv.dispatcher = tstSrv
}
tstSrv.freezeCond.L = &tstSrv.lock
compareCert := cluster.CachePublicKeyComparisons(func(a, b []byte) bool {
return crypto.CertificatesWithSamePublicKey(a, b) == nil
})
tstSrv.c = &cluster.Comm{
CertExpWarningThreshold: time.Hour,
SendBufferSize: 1,
Logger: flogging.MustGetLogger("test"),
Chan2Members: make(cluster.MembersByChannel),
H: handler,
ChanExt: channelExtractor,
Connections: cluster.NewConnectionStore(dialer, tlsConnGauge),
Metrics: cluster.NewMetrics(metrics),
CompareCertificate: compareCert,
}
orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv.dispatcher)
go gRPCServer.Start()
return tstSrv
}
func newTestNode(t *testing.T) *clusterNode {
return newTestNodeWithMetrics(t, &disabled.Provider{}, &disabled.Gauge{})
}
func TestSendBigMessage(t *testing.T) {
// Scenario: Basic test that spawns 5 nodes and sends a big message
// from one of the nodes to the others.
// A receiver node's Step() server side method (which calls Recv)
// is frozen until the sender's node Send method returns,
// Hence - the sender node finishes calling Send
// before a receiver node starts calling Recv.
// This ensures that Send is non blocking even with big messages.
// In the test, we send a total of 8MB of random data (2MB to each node).
// The randomness is used so gRPC compression won't compress it to a lower size.
node1 := newTestNode(t)
node2 := newTestNode(t)
node3 := newTestNode(t)
node4 := newTestNode(t)
node5 := newTestNode(t)
for _, node := range []*clusterNode{node2, node3, node4, node5} {
node.c.SendBufferSize = 1
}
defer node1.stop()
defer node2.stop()
defer node3.stop()
defer node4.stop()
defer node5.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo, node3.nodeInfo, node4.nodeInfo, node5.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
node3.c.Configure(testChannel, config)
node4.c.Configure(testChannel, config)
node5.c.Configure(testChannel, config)
var messageReceived sync.WaitGroup
messageReceived.Add(4)
msgSize := 1024 * 1024 * 2
bigMsg := &orderer.ConsensusRequest{
Channel: testChannel,
Payload: make([]byte, msgSize),
}
_, err := rand.Read(bigMsg.Payload)
require.NoError(t, err)
wrappedMsg := &orderer.StepRequest{
Payload: &orderer.StepRequest_ConsensusRequest{
ConsensusRequest: bigMsg,
},
}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
node.handler.On("OnConsensus", testChannel, node1.nodeInfo.ID, mock.Anything).Run(func(args mock.Arguments) {
msg := args.Get(2).(*orderer.ConsensusRequest)
require.Len(t, msg.Payload, msgSize)
messageReceived.Done()
}).Return(nil)
}
streams := map[uint64]*cluster.Stream{}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
// Freeze the node, in order to block its Recv
node.freeze()
}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
rm, err := node1.c.Remote(testChannel, node.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, rm)
streams[node.nodeInfo.ID] = stream
}
t0 := time.Now()
for _, node := range []*clusterNode{node2, node3, node4, node5} {
stream := streams[node.nodeInfo.ID]
t1 := time.Now()
err = stream.Send(wrappedMsg)
require.NoError(t, err)
t.Log("Sending took", time.Since(t1))
// Unfreeze the node. It can now call Recv, and signal the messageReceived waitGroup.
node.unfreeze()
}
t.Log("Total sending time to all 4 nodes took:", time.Since(t0))
messageReceived.Wait()
}
func TestBlockingSend(t *testing.T) {
// Scenario: Basic test that spawns 2 nodes and sends from the first node
// to the second node, three SubmitRequests, or three consensus requests.
// SubmitRequests should block, but consensus requests should not.
for _, testCase := range []struct {
description string
messageToSend *orderer.StepRequest
streamUnblocks bool
elapsedGreaterThan time.Duration
overflowErr string
}{
{
description: "SubmitRequest",
messageToSend: wrapSubmitReq(testReq),
streamUnblocks: true,
elapsedGreaterThan: time.Second / 2,
},
{
description: "ConsensusRequest",
messageToSend: testConsensusReq,
overflowErr: "send queue overflown",
},
} {
t.Run(testCase.description, func(t *testing.T) {
node1 := newTestNode(t)
node2 := newTestNode(t)
node1.c.SendBufferSize = 1
node2.c.SendBufferSize = 1
defer node1.stop()
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(errors.Errorf("whoops")).Once()
rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
fakeStream := &mocks.StepClientStream{}
rm.GetStreamFunc = func(ctx context.Context) (cluster.StepClientStream, error) {
return fakeStream, nil
}
rm.ProbeConn = func(_ *grpc.ClientConn) error {
return nil
}
fakeStream.On("Context", mock.Anything).Return(context.Background())
fakeStream.On("Auth").Return(nil).Once()
unBlock := make(chan struct{})
var sendInvoked sync.WaitGroup
sendInvoked.Add(1)
var once sync.Once
fakeStream.On("Send", mock.Anything).Run(func(_ mock.Arguments) {
once.Do(sendInvoked.Done)
<-unBlock
}).Return(errors.New("oops"))
stream, err := rm.NewStream(time.Hour)
require.NoError(t, err)
// The first send doesn't block, even though the Send operation blocks.
err = stream.Send(testCase.messageToSend)
require.NoError(t, err)
// The second once doesn't either.
// After this point, we have 1 goroutine which is blocked on Send(),
// and one message in the buffer.
sendInvoked.Wait()
err = stream.Send(testCase.messageToSend)
require.NoError(t, err)
// The third blocks, so we need to unblock it ourselves
// in order for it to go through, unless the operation
// is non blocking.
go func() {
time.Sleep(time.Second)
if testCase.streamUnblocks {
close(unBlock)
}
}()
t1 := time.Now()
err = stream.Send(testCase.messageToSend)
// The third send always overflows or blocks.
// If we expect to receive an overflow error - assert it.
if testCase.overflowErr != "" {
require.EqualError(t, err, testCase.overflowErr)
}
elapsed := time.Since(t1)
t.Log("Elapsed time:", elapsed)
require.True(t, elapsed > testCase.elapsedGreaterThan)
if !testCase.streamUnblocks {
close(unBlock)
}
})
}
}
func TestEmptyRequest(t *testing.T) {
// Scenario: Ensures empty messages are discarded and an error is returned
// back to the sender.
node1 := newTestNode(t)
node2 := newTestNode(t)
node2.srv.Stop()
svc := &cluster.Service{
StepLogger: flogging.MustGetLogger("test"),
Logger: flogging.MustGetLogger("test"),
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Dispatcher: node2.c,
}
node2.dispatcher = svc
// Sleep to let the gRPC service be closed
time.Sleep(time.Second)
// Resurrect the node with the new dispatcher
node2.resurrect()
defer node1.stop()
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
assertBiDiCommunication(t, node1, node2, testReq)
rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
stream, err := rm.NewStream(time.Second * 10)
require.NoError(t, err)
err = stream.Send(&orderer.StepRequest{})
require.NoError(t, err)
_, err = stream.Recv()
require.Error(t, err, "message is neither a Submit nor a Consensus request")
}
func TestBasic(t *testing.T) {
// Scenario: Basic test that spawns 2 nodes and sends each other
// messages that are expected to be echoed back
node1 := newTestNode(t)
node2 := newTestNode(t)
defer node1.stop()
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
assertBiDiCommunication(t, node1, node2, testReq)
}
func TestUnavailableHosts(t *testing.T) {
// Scenario: A node is configured to connect
// to a host that is down
node1 := newTestNode(t)
clientConfig := node1.dialer.Config
// The below timeout makes sure that connection establishment is done
// asynchronously. Had it been synchronous, the Remote() call would be
// blocked for an hour.
clientConfig.DialTimeout = time.Hour
defer node1.stop()
node2 := newTestNode(t)
node2.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
remote, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
require.NotNil(t, remote)
_, err = remote.NewStream(time.Millisecond * 100)
require.Contains(t, err.Error(), "connection")
}
func TestStreamAbortReportCorrectError(t *testing.T) {
// Scenario: node 1 acquires a stream to node 2 and then the stream
// encounters an error and as a result, the stream is aborted.
// We ensure the error reported is the first error, even after
// multiple attempts of using it.
node1 := newTestNode(t)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(errors.Errorf("whoops")).Once()
rm1, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
var streamTerminated sync.WaitGroup
streamTerminated.Add(1)
stream := assertEventualEstablishStream(t, rm1)
l, err := zap.NewDevelopment()
require.NoError(t, err)
stream.Logger = flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "Stream 1 to") && strings.Contains(entry.Message, "terminated") {
streamTerminated.Done()
}
return nil
}))
// Probe the stream for the first time
err = stream.Send(wrapSubmitReq(testReq))
require.NoError(t, err)
// We should receive back the crafted error
_, err = stream.Recv()
require.Contains(t, err.Error(), "whoops")
// Wait for the stream to be terminated from within the communication infrastructure
streamTerminated.Wait()
// We should still receive the original crafted error despite the stream being terminated
err = stream.Send(wrapSubmitReq(testReq))
require.Contains(t, err.Error(), "whoops")
}
func TestStreamAbort(t *testing.T) {
// Scenarios: node 1 is connected to node 2 in 2 channels,
// and the consumer of the communication calls receive.
// The two sub-scenarios happen:
// 1) The server certificate of node 2 changes in the first channel
// 2) Node 2 is evicted from the membership of the first channel
// In both of the scenarios, the Recv() call should be aborted
node2 := newTestNode(t)
defer node2.stop()
invalidNodeInfo := cluster.RemoteNode{
NodeAddress: cluster.NodeAddress{ID: node2.nodeInfo.ID},
NodeCerts: cluster.NodeCerts{
ServerTLSCert: []byte{1, 2, 3},
ClientTLSCert: []byte{1, 2, 3},
},
}
for _, tst := range []struct {
testName string
membership []cluster.RemoteNode
expectedError string
}{
{
testName: "Evicted from membership",
membership: nil,
expectedError: "rpc error: code = Canceled desc = context canceled",
},
{
testName: "Changed TLS certificate",
membership: []cluster.RemoteNode{invalidNodeInfo},
expectedError: "rpc error: code = Canceled desc = context canceled",
},
} {
t.Run(tst.testName, func(t *testing.T) {
testStreamAbort(t, node2, tst.membership, tst.expectedError)
})
}
node2.handler.AssertNumberOfCalls(t, "OnSubmit", 2)
}
func testStreamAbort(t *testing.T, node2 *clusterNode, newMembership []cluster.RemoteNode, expectedError string) {
node1 := newTestNode(t)
defer node1.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})
node1.c.Configure(testChannel2, []cluster.RemoteNode{node2.nodeInfo})
node2.c.Configure(testChannel2, []cluster.RemoteNode{node1.nodeInfo})
var streamCreated sync.WaitGroup
streamCreated.Add(1)
stopChan := make(chan struct{})
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Once().Run(func(_ mock.Arguments) {
// Notify the stream was created
streamCreated.Done()
// Wait for the test to finish
<-stopChan
}).Return(nil).Once()
rm1, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
go func() {
stream := assertEventualEstablishStream(t, rm1)
// Signal the reconfiguration
err = stream.Send(wrapSubmitReq(testReq))
require.NoError(t, err)
_, err := stream.Recv()
require.Contains(t, err.Error(), expectedError)
close(stopChan)
}()
go func() {
// Wait for the stream reference to be obtained
streamCreated.Wait()
// Reconfigure the channel membership
node1.c.Configure(testChannel, newMembership)
}()
<-stopChan
}
func TestDoubleReconfigure(t *testing.T) {
// Scenario: Basic test that spawns 2 nodes
// and configures node 1 twice, and checks that
// the remote stub for node 1 wasn't re-created in the second
// configuration since it already existed
node1 := newTestNode(t)
node2 := newTestNode(t)
defer node1.stop()
defer node2.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
rm1, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
rm2, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
// Ensure the references are equal
require.True(t, rm1 == rm2)
}
func TestInvalidChannel(t *testing.T) {
// Scenario: node 1 it ordered to send a message on a channel
// that doesn't exist, and also receives a message, but
// the channel cannot be extracted from the message.
t.Run("channel doesn't exist", func(t *testing.T) {
node1 := newTestNode(t)
defer node1.stop()
_, err := node1.c.Remote(testChannel, 0)
require.EqualError(t, err, "channel test doesn't exist")
})
t.Run("channel cannot be extracted", func(t *testing.T) {
node1 := newTestNode(t)
defer node1.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() (bool, error) {
_, err := node1.c.Remote(testChannel, node1.nodeInfo.ID)
return true, err
}, time.Minute).Should(gomega.BeTrue())
stub, err := node1.c.Remote(testChannel, node1.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, stub)
// An empty SubmitRequest has an empty channel which is invalid
err = stream.Send(wrapSubmitReq(&orderer.SubmitRequest{}))
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = badly formatted message, cannot extract channel")
// Test directly without going through the gRPC stream
err = node1.c.DispatchSubmit(context.Background(), &orderer.SubmitRequest{})
require.EqualError(t, err, "badly formatted message, cannot extract channel")
})
}
func TestAbortRPC(t *testing.T) {
// Scenarios:
// (I) The node calls an RPC, and calls Abort() on the remote context
// in parallel. The RPC should return even though the server-side call hasn't finished.
// (II) The node calls an RPC, but the server-side processing takes too long,
// and the RPC invocation returns prematurely.
testCases := []struct {
name string
abortFunc func(*cluster.RemoteContext)
rpcTimeout time.Duration
expectedErr string
}{
{
name: "Abort() called",
expectedErr: "rpc error: code = Canceled desc = context canceled",
rpcTimeout: time.Hour,
abortFunc: func(rc *cluster.RemoteContext) {
rc.Abort()
},
},
{
name: "RPC timeout",
expectedErr: "rpc timeout expired",
rpcTimeout: time.Second,
abortFunc: func(*cluster.RemoteContext) {},
},
}
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
testAbort(t, testCase.abortFunc, testCase.rpcTimeout, testCase.expectedErr)
})
}
}
func testAbort(t *testing.T, abortFunc func(*cluster.RemoteContext), rpcTimeout time.Duration, expectedErr string) {
node1 := newTestNode(t)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
var onStepCalled sync.WaitGroup
onStepCalled.Add(1)
// stuckCall ensures the OnStep() call is stuck throughout this test
var stuckCall sync.WaitGroup
stuckCall.Add(1)
// At the end of the test, release the server-side resources
defer stuckCall.Done()
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(_ mock.Arguments) {
onStepCalled.Done()
stuckCall.Wait()
}).Once()
rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
go func() {
onStepCalled.Wait()
abortFunc(rm)
}()
var stream *cluster.Stream
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
stream, err = rm.NewStream(rpcTimeout)
return err
}, time.Second*10, time.Millisecond*10).Should(gomega.Succeed())
stream.Send(wrapSubmitReq(testSubReq))
_, err = stream.Recv()
require.EqualError(t, err, expectedErr)
node2.handler.AssertNumberOfCalls(t, "OnSubmit", 1)
}
func TestNoTLSCertificate(t *testing.T) {
// Scenario: The node is sent a message by another node that doesn't
// connect with mutual TLS, thus doesn't provide a TLS certificate
node1 := newTestNode(t)
defer node1.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})
clientConfig := comm_utils.ClientConfig{
AsyncConnect: true,
DialTimeout: time.Millisecond * 100,
SecOpts: comm_utils.SecureOptions{
ServerRootCAs: [][]byte{ca.CertBytes()},
UseTLS: true,
},
}
var conn *grpc.ClientConn
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() (bool, error) {
var err error
conn, err = clientConfig.Dial(node1.srv.Address())
return true, err
}, time.Minute).Should(gomega.BeTrue())
echoClient := orderer.NewClusterClient(conn)
stream, err := echoClient.Step(context.Background())
require.NoError(t, err)
err = stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = no TLS certificate sent")
}
func TestReconnect(t *testing.T) {
// Scenario: node 1 and node 2 are connected,
// and node 2 is taken offline.
// Node 1 tries to send a message to node 2 but fails,
// and afterwards node 2 is brought back, after which
// node 1 sends more messages, and it should succeed
// sending a message to node 2 eventually.
node1 := newTestNode(t)
defer node1.stop()
conf := node1.dialer.Config
conf.DialTimeout = time.Hour
node2 := newTestNode(t)
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil)
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
// Make node 2 be offline by shutting down its gRPC service
node2.srv.Stop()
// Obtain the stub for node 2.
// Should succeed, because the connection was created at time of configuration
stub, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
// Try to obtain a stream. Should not Succeed.
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
_, err = stub.NewStream(time.Hour)
return err
}).Should(gomega.Not(gomega.Succeed()))
// Wait for the port to be released
for {
lsnr, err := net.Listen("tcp", node2.nodeInfo.Endpoint)
if err == nil {
lsnr.Close()
break
}
}
// Resurrect node 2
node2.resurrect()
// Send a message from node 1 to node 2.
// Should succeed eventually
assertEventualSendMessage(t, stub, testReq)
}
func TestRenewCertificates(t *testing.T) {
// Scenario: node 1 and node 2 are connected,
// Node 2's certificate is renewed, and
// node 1 is reconfigured with the new
// configuration without being restarted.
node1 := newTestNode(t)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
node1.handler.On("OnStep", testChannel, node2.nodeInfo.ID, mock.Anything).Return(testRes, nil)
node2.handler.On("OnStep", testChannel, node1.nodeInfo.ID, mock.Anything).Return(testRes, nil)
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
assertBiDiCommunication(t, node1, node2, testReq)
// Close outgoing connections from node2 to node1
node2.c.Configure(testChannel, nil)
// Stop the gRPC service of node 2 to replace its certificate
node2.srv.Stop()
// Wait until node 1 detects this
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
remote, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
if err != nil {
return err
}
stream, err := remote.NewStream(time.Hour)
if err != nil {
return err
}
err = stream.Send(wrapSubmitReq(testSubReq))
if err != nil {
return err
}
return nil
}).Should(gomega.Not(gomega.Succeed()))
// Renew node 2's keys
node2.renewCertificates()
// Resurrect node 2 to make it service connections again
node2.resurrect()
// W.L.O.G, try to send a message from node1 to node2
// It should fail, because node2's server certificate has now changed,
// so it closed the connection to the remote node
info2 := node2.nodeInfo
remote, err := node1.c.Remote(testChannel, info2.ID)
require.NoError(t, err)
require.NotNil(t, remote)
_, err = remote.NewStream(time.Hour)
require.Contains(t, err.Error(), info2.Endpoint)
// Reconfigure both nodes with the updates keys
config = []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
// Finally, check that the nodes can communicate once again
assertBiDiCommunication(t, node1, node2, testReq)
}
func TestMembershipReconfiguration(t *testing.T) {
// Scenario: node 1 and node 2 are started up
// and node 2 is configured to know about node 1,
// without node1 knowing about node 2.
// The communication between them should only work
// after node 1 is configured to know about node 2.
node1 := newTestNode(t)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
node1.c.Configure(testChannel, []cluster.RemoteNode{})
node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})
// Node 1 can't connect to node 2 because it doesn't know its TLS certificate yet
_, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.EqualError(t, err, fmt.Sprintf("node %d doesn't exist in channel test's membership", node2.nodeInfo.ID))
// Node 2 can connect to node 1, but it can't send it messages because node 1 doesn't know node 2 yet.
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() (bool, error) {
_, err := node2.c.Remote(testChannel, node1.nodeInfo.ID)
return true, err
}, time.Minute).Should(gomega.BeTrue())
stub, err := node2.c.Remote(testChannel, node1.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, stub)
err = stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = certificate extracted from TLS connection isn't authorized")
// Next, configure node 1 to know about node 2
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
// Check that the communication works correctly between both nodes
assertBiDiCommunication(t, node1, node2, testReq)
assertBiDiCommunication(t, node2, node1, testReq)
// Reconfigure node 2 to forget about node 1
node2.c.Configure(testChannel, []cluster.RemoteNode{})
// Node 1 can still connect to node 2
stub, err = node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
// But can't send a message because node 2 now doesn't authorized node 1
stream = assertEventualEstablishStream(t, stub)
stream.Send(wrapSubmitReq(testSubReq))
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = certificate extracted from TLS connection isn't authorized")
}
func TestShutdown(t *testing.T) {
// Scenario: node 1 is shut down and as a result, can't
// send messages to anyone, nor can it be reconfigured
node1 := newTestNode(t)
defer node1.stop()
node1.c.Shutdown()
// Obtaining a RemoteContext cannot succeed because shutdown was called before
_, err := node1.c.Remote(testChannel, node1.nodeInfo.ID)
require.EqualError(t, err, "communication has been shut down")
node2 := newTestNode(t)
defer node2.stop()
node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})
// Configuration of node doesn't take place
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
_, err := node2.c.Remote(testChannel, node1.nodeInfo.ID)
return err
}, time.Minute).Should(gomega.Succeed())
stub, err := node2.c.Remote(testChannel, node1.nodeInfo.ID)
require.NoError(t, err)
// Therefore, sending a message doesn't succeed because node 1 rejected the configuration change
gt.Eventually(func() string {
stream, err := stub.NewStream(time.Hour)
if err != nil {
return err.Error()
}
err = stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
_, err = stream.Recv()
return err.Error()
}, timeout).Should(gomega.ContainSubstring("channel test doesn't exist"))
}
func TestMultiChannelConfig(t *testing.T) {
// Scenario: node 1 is knows node 2 only in channel "foo"
// and knows node 3 only in channel "bar".
// Messages that are received, are routed according to their corresponding channels
// and when node 2 sends a message for channel "bar" to node 1, it is rejected.
// Same thing applies for node 3 that sends a message to node 1 in channel "foo".
node1 := newTestNode(t)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
node3 := newTestNode(t)
defer node3.stop()
node1.c.Configure("foo", []cluster.RemoteNode{node2.nodeInfo})
node1.c.Configure("bar", []cluster.RemoteNode{node3.nodeInfo})
node2.c.Configure("foo", []cluster.RemoteNode{node1.nodeInfo})
node3.c.Configure("bar", []cluster.RemoteNode{node1.nodeInfo})
t.Run("Correct channel", func(t *testing.T) {
var fromNode2 sync.WaitGroup
fromNode2.Add(1)
node1.handler.On("OnSubmit", "foo", node2.nodeInfo.ID, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
fromNode2.Done()
}).Once()
var fromNode3 sync.WaitGroup
fromNode3.Add(1)
node1.handler.On("OnSubmit", "bar", node3.nodeInfo.ID, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
fromNode3.Done()
}).Once()
node2toNode1, err := node2.c.Remote("foo", node1.nodeInfo.ID)
require.NoError(t, err)
node3toNode1, err := node3.c.Remote("bar", node1.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, node2toNode1)
stream.Send(fooReq)
fromNode2.Wait()
node1.handler.AssertNumberOfCalls(t, "OnSubmit", 1)
stream = assertEventualEstablishStream(t, node3toNode1)
stream.Send(barReq)
fromNode3.Wait()
node1.handler.AssertNumberOfCalls(t, "OnSubmit", 2)
})
t.Run("Incorrect channel", func(t *testing.T) {
node1.handler.On("OnSubmit", "foo", node2.nodeInfo.ID, mock.Anything).Return(nil)
node1.handler.On("OnSubmit", "bar", node3.nodeInfo.ID, mock.Anything).Return(nil)
node2toNode1, err := node2.c.Remote("foo", node1.nodeInfo.ID)
require.NoError(t, err)
node3toNode1, err := node3.c.Remote("bar", node1.nodeInfo.ID)
require.NoError(t, err)
assertEventualSendMessage(t, node2toNode1, &orderer.SubmitRequest{Channel: "foo"})
require.NoError(t, err)
stream, err := node2toNode1.NewStream(time.Hour)
require.NoError(t, err)
err = stream.Send(barReq)
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = certificate extracted from TLS connection isn't authorized")
assertEventualSendMessage(t, node3toNode1, &orderer.SubmitRequest{Channel: "bar"})
stream, err = node3toNode1.NewStream(time.Hour)
require.NoError(t, err)
err = stream.Send(fooReq)
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = certificate extracted from TLS connection isn't authorized")
})
}
func TestConnectionFailure(t *testing.T) {
// Scenario: node 1 fails to connect to node 2.
node1 := newTestNode(t)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
dialer := &mocks.SecureDialer{}
dialer.On("Dial", mock.Anything, mock.Anything).Return(nil, errors.New("oops"))
node1.c.Connections = cluster.NewConnectionStore(dialer, &disabled.Gauge{})
node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
_, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.EqualError(t, err, "oops")
}
type testMetrics struct {
fakeProvider *mocks.MetricsProvider
egressQueueLength metricsfakes.Gauge
egressQueueCapacity metricsfakes.Gauge
egressStreamCount metricsfakes.Gauge
egressTLSConnCount metricsfakes.Gauge
egressWorkerSize metricsfakes.Gauge
ingressStreamsCount metricsfakes.Gauge
msgSendTime metricsfakes.Histogram
msgDropCount metricsfakes.Counter
}
func (tm *testMetrics) initialize() {
tm.egressQueueLength.WithReturns(&tm.egressQueueLength)
tm.egressQueueCapacity.WithReturns(&tm.egressQueueCapacity)
tm.egressStreamCount.WithReturns(&tm.egressStreamCount)
tm.egressTLSConnCount.WithReturns(&tm.egressTLSConnCount)
tm.egressWorkerSize.WithReturns(&tm.egressWorkerSize)
tm.ingressStreamsCount.WithReturns(&tm.ingressStreamsCount)
tm.msgSendTime.WithReturns(&tm.msgSendTime)
tm.msgDropCount.WithReturns(&tm.msgDropCount)
fakeProvider := tm.fakeProvider
fakeProvider.On("NewGauge", cluster.IngressStreamsCountOpts).Return(&tm.ingressStreamsCount)
fakeProvider.On("NewGauge", cluster.EgressQueueLengthOpts).Return(&tm.egressQueueLength)
fakeProvider.On("NewGauge", cluster.EgressQueueCapacityOpts).Return(&tm.egressQueueCapacity)
fakeProvider.On("NewGauge", cluster.EgressStreamsCountOpts).Return(&tm.egressStreamCount)
fakeProvider.On("NewGauge", cluster.EgressTLSConnectionCountOpts).Return(&tm.egressTLSConnCount)
fakeProvider.On("NewGauge", cluster.EgressWorkersOpts).Return(&tm.egressWorkerSize)
fakeProvider.On("NewCounter", cluster.MessagesDroppedCountOpts).Return(&tm.msgDropCount)
fakeProvider.On("NewHistogram", cluster.MessageSendTimeOpts).Return(&tm.msgSendTime)
}
func TestMetrics(t *testing.T) {
for _, testCase := range []struct {
name string
runTest func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics)
testMetrics *testMetrics
}{
{
name: "EgressQueueOccupancy",
runTest: func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics) {
assertBiDiCommunication(t, node1, node2, testReq)
require.Equal(t, []string{"host", node2.nodeInfo.Endpoint, "msg_type", "transaction", "channel", testChannel},
testMetrics.egressQueueLength.WithArgsForCall(0))
require.Equal(t, float64(0), testMetrics.egressQueueLength.SetArgsForCall(0))
require.Equal(t, float64(1), testMetrics.egressQueueCapacity.SetArgsForCall(0))
var messageReceived sync.WaitGroup
messageReceived.Add(1)
node2.handler.On("OnConsensus", testChannel, node1.nodeInfo.ID, mock.Anything).Run(func(args mock.Arguments) {
messageReceived.Done()
}).Return(nil)
rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, rm)
stream.Send(testConsensusReq)
messageReceived.Wait()
require.Equal(t, []string{"host", node2.nodeInfo.Endpoint, "msg_type", "consensus", "channel", testChannel},
testMetrics.egressQueueLength.WithArgsForCall(1))
require.Equal(t, float64(0), testMetrics.egressQueueLength.SetArgsForCall(1))
require.Equal(t, float64(1), testMetrics.egressQueueCapacity.SetArgsForCall(1))
},
},
{
name: "EgressStreamsCount",
runTest: func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics) {
assertBiDiCommunication(t, node1, node2, testReq)
require.Equal(t, 1, testMetrics.egressStreamCount.SetCallCount())
require.Equal(t, 1, testMetrics.egressStreamCount.WithCallCount())
require.Equal(t, []string{"channel", testChannel}, testMetrics.egressStreamCount.WithArgsForCall(0))
assertBiDiCommunicationForChannel(t, node1, node2, testReq2, testChannel2)
require.Equal(t, 2, testMetrics.egressStreamCount.SetCallCount())
require.Equal(t, 2, testMetrics.egressStreamCount.WithCallCount())
require.Equal(t, []string{"channel", testChannel2}, testMetrics.egressStreamCount.WithArgsForCall(1))
},
},
{
name: "EgressTLSConnCount",
runTest: func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics) {
assertBiDiCommunication(t, node1, node2, testReq)
require.Equal(t, []string{"channel", testChannel}, testMetrics.egressStreamCount.WithArgsForCall(0))
assertBiDiCommunicationForChannel(t, node1, node2, testReq2, testChannel2)
require.Equal(t, []string{"channel", testChannel2}, testMetrics.egressStreamCount.WithArgsForCall(1))
// A single TLS connection despite 2 streams
require.Equal(t, float64(1), testMetrics.egressTLSConnCount.SetArgsForCall(0))
require.Equal(t, 1, testMetrics.egressTLSConnCount.SetCallCount())
},
},
{
name: "EgressWorkerSize",
runTest: func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics) {
assertBiDiCommunication(t, node1, node2, testReq)
require.Equal(t, []string{"channel", testChannel}, testMetrics.egressStreamCount.WithArgsForCall(0))
assertBiDiCommunicationForChannel(t, node1, node2, testReq2, testChannel2)
require.Equal(t, []string{"channel", testChannel2}, testMetrics.egressStreamCount.WithArgsForCall(1))
require.Equal(t, float64(1), testMetrics.egressWorkerSize.SetArgsForCall(0))
require.Equal(t, float64(1), testMetrics.egressWorkerSize.SetArgsForCall(1))
},
},
{
name: "MsgSendTime",
runTest: func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics) {
assertBiDiCommunication(t, node1, node2, testReq)
require.Eventually(t, func() bool { return testMetrics.msgSendTime.ObserveCallCount() > 0 }, time.Second, 10*time.Millisecond)
require.Equal(t, 1, testMetrics.msgSendTime.ObserveCallCount())
require.Equal(t, []string{"host", node2.nodeInfo.Endpoint, "channel", testChannel}, testMetrics.msgSendTime.WithArgsForCall(0))
},
},
{
name: "MsgDropCount",
runTest: func(t *testing.T, node1, node2 *clusterNode, testMetrics *testMetrics) {
blockRecv := make(chan struct{})
wasReported := func() bool {
select {
case <-blockRecv:
return true
default:
return false
}
}
// When the drop count is reported, release the lock on the server side receive operation.
testMetrics.msgDropCount.AddStub = func(float642 float64) {
if !wasReported() {
close(blockRecv)
}
}
node2.handler.On("OnConsensus", testChannel, node1.nodeInfo.ID, mock.Anything).Run(func(args mock.Arguments) {
// Block until the message drop is reported
<-blockRecv
}).Return(nil)
rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, rm)
// Send too many messages while the server side is not reading from the stream
for {
stream.Send(testConsensusReq)
if wasReported() {
break
}
}
require.Equal(t, []string{"host", node2.nodeInfo.Endpoint, "channel", testChannel},
testMetrics.msgDropCount.WithArgsForCall(0))
require.Equal(t, 1, testMetrics.msgDropCount.AddCallCount())
},
},
} {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
fakeProvider := &mocks.MetricsProvider{}
testCase.testMetrics = &testMetrics{
fakeProvider: fakeProvider,
}
testCase.testMetrics.initialize()
node1 := newTestNodeWithMetrics(t, fakeProvider, &testCase.testMetrics.egressTLSConnCount)
defer node1.stop()
node2 := newTestNode(t)
defer node2.stop()
configForNode1 := []cluster.RemoteNode{node2.nodeInfo}
configForNode2 := []cluster.RemoteNode{node1.nodeInfo}
node1.c.Configure(testChannel, configForNode1)
node2.c.Configure(testChannel, configForNode2)
node1.c.Configure(testChannel2, configForNode1)
node2.c.Configure(testChannel2, configForNode2)
testCase.runTest(t, node1, node2, testCase.testMetrics)
})
}
}
func TestCertExpirationWarningEgress(t *testing.T) {
// Scenario: Ensures that when certificates are due to expire,
// a warning is logged to the log.
node1 := newTestNode(t)
node2 := newTestNode(t)
cert, err := x509.ParseCertificate(node2.nodeInfo.ServerTLSCert)
require.NoError(t, err)
require.NotNil(t, cert)
// Let the NotAfter time of the certificate be T1, the current time be T0.
// So time.Until is (T1 - T0), which means we have (T1 - T0) time left.
// We want to trigger a warning, so we set the warning threshold to be 20 seconds above
// the time left, so the time left would be smaller than the threshold.
node1.c.CertExpWarningThreshold = time.Until(cert.NotAfter) + time.Second*20
// We only alert once in 3 seconds
node1.c.MinimumExpirationWarningInterval = time.Second * 3
defer node1.stop()
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
stub, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil)
mockgRPC := &mocks.StepClient{}
mockgRPC.On("Send", mock.Anything).Return(nil)
mockgRPC.On("Context").Return(context.Background())
mockClient := &mocks.ClusterClient{}
mockClient.On("Step", mock.Anything).Return(mockgRPC, nil)
stream := assertEventualEstablishStream(t, stub)
alerts := make(chan struct{}, 100)
stream.Logger = stream.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "expires in less than") {
alerts <- struct{}{}
}
return nil
}))
// Send a message to the node and expert an alert to be logged.
stream.Send(wrapSubmitReq(testReq))
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have logged an alert")
}
// Send another message, and ensure we don't log anything to the log, because the
// alerts should be suppressed before the minimum interval timeout expires.
stream.Send(wrapSubmitReq(testReq))
select {
case <-alerts:
t.Fatal("Should not have logged an alert")
case <-time.After(time.Millisecond * 500):
}
// Wait enough time for the alert interval to clear.
time.Sleep(node1.c.MinimumExpirationWarningInterval + time.Second)
// Send again a message, and this time it should be logged again.
stream.Send(wrapSubmitReq(testReq))
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have logged an alert")
}
}
func assertBiDiCommunicationForChannel(t *testing.T, node1, node2 *clusterNode, msgToSend *orderer.SubmitRequest, channel string) {
establish := []struct {
label string
sender *clusterNode
receiver *clusterNode
target uint64
}{
{label: "1->2", sender: node1, target: node2.nodeInfo.ID, receiver: node2},
{label: "2->1", sender: node2, target: node1.nodeInfo.ID, receiver: node1},
}
for _, estab := range establish {
stub, err := estab.sender.c.Remote(channel, estab.target)
require.NoError(t, err)
stream := assertEventualEstablishStream(t, stub)
var wg sync.WaitGroup
wg.Add(1)
estab.receiver.handler.On("OnSubmit", channel, estab.sender.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
req := args.Get(2).(*orderer.SubmitRequest)
require.True(t, proto.Equal(req, msgToSend))
t.Log(estab.label)
wg.Done()
})
err = stream.Send(wrapSubmitReq(msgToSend))
require.NoError(t, err)
wg.Wait()
}
}
func assertBiDiCommunication(t *testing.T, node1, node2 *clusterNode, msgToSend *orderer.SubmitRequest) {
assertBiDiCommunicationForChannel(t, node1, node2, msgToSend, testChannel)
}
func assertEventualEstablishStream(t *testing.T, rpc *cluster.RemoteContext) *cluster.Stream {
var res *cluster.Stream
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
stream, err := rpc.NewStream(time.Hour)
res = stream
return err
}, timeout).Should(gomega.Succeed())
return res
}
func assertEventualSendMessage(t *testing.T, rpc *cluster.RemoteContext, req *orderer.SubmitRequest) *cluster.Stream {
var res *cluster.Stream
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
stream, err := rpc.NewStream(time.Hour)
if err != nil {
return err
}
res = stream
return stream.Send(wrapSubmitReq(req))
}, timeout).Should(gomega.Succeed())
return res
}
func wrapSubmitReq(req *orderer.SubmitRequest) *orderer.StepRequest {
return &orderer.StepRequest{
Payload: &orderer.StepRequest_SubmitRequest{
SubmitRequest: req,
},
}
}