106 lines
3.1 KiB
Go
106 lines
3.1 KiB
Go
/*
|
|
Copyright IBM Corp. 2017 All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package cluster_test
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/hyperledger/fabric/common/metrics/disabled"
|
|
"github.com/hyperledger/fabric/orderer/common/cluster"
|
|
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func TestConcurrentConnections(t *testing.T) {
|
|
// Scenario: Have 100 goroutines try to create a connection together at the same time,
|
|
// wait until one of them succeeds, and then wait until they all return,
|
|
// and also ensure they all return the same connection reference
|
|
n := 100
|
|
var wg sync.WaitGroup
|
|
wg.Add(n)
|
|
dialer := &mocks.SecureDialer{}
|
|
conn := &grpc.ClientConn{}
|
|
dialer.On("Dial", mock.Anything, mock.Anything).Return(conn, nil)
|
|
connStore := cluster.NewConnectionStore(dialer, &disabled.Gauge{})
|
|
connect := func() {
|
|
defer wg.Done()
|
|
conn2, err := connStore.Connection("", nil)
|
|
require.NoError(t, err)
|
|
require.True(t, conn2 == conn)
|
|
}
|
|
for i := 0; i < n; i++ {
|
|
go connect()
|
|
}
|
|
wg.Wait()
|
|
dialer.AssertNumberOfCalls(t, "Dial", 1)
|
|
}
|
|
|
|
type connectionMapperSpy struct {
|
|
lookupDelay chan struct{}
|
|
lookupInvoked chan struct{}
|
|
cluster.ConnectionMapper
|
|
}
|
|
|
|
func (cms *connectionMapperSpy) Lookup(cert []byte) (*grpc.ClientConn, bool) {
|
|
// Signal that Lookup() has been invoked
|
|
cms.lookupInvoked <- struct{}{}
|
|
// Wait for the main test to signal to advance.
|
|
// This is needed because we need to ensure that all instances
|
|
// of the connectionMapperSpy invoked Lookup()
|
|
<-cms.lookupDelay
|
|
return cms.ConnectionMapper.Lookup(cert)
|
|
}
|
|
|
|
func TestConcurrentLookupMiss(t *testing.T) {
|
|
// Scenario: 2 concurrent connection attempts are made,
|
|
// and the first 2 Lookup operations are delayed,
|
|
// which makes the connection store attempt to connect
|
|
// at the same time twice.
|
|
// A single connection should be created regardless.
|
|
|
|
dialer := &mocks.SecureDialer{}
|
|
conn := &grpc.ClientConn{}
|
|
dialer.On("Dial", mock.Anything, mock.Anything).Return(conn, nil)
|
|
|
|
connStore := cluster.NewConnectionStore(dialer, &disabled.Gauge{})
|
|
// Wrap the connection mapping with a spy that intercepts Lookup() invocations
|
|
spy := &connectionMapperSpy{
|
|
ConnectionMapper: connStore.Connections,
|
|
lookupDelay: make(chan struct{}, 2),
|
|
lookupInvoked: make(chan struct{}, 2),
|
|
}
|
|
connStore.Connections = spy
|
|
|
|
var goroutinesExited sync.WaitGroup
|
|
goroutinesExited.Add(2)
|
|
|
|
for i := 0; i < 2; i++ {
|
|
go func() {
|
|
defer goroutinesExited.Done()
|
|
conn2, err := connStore.Connection("", nil)
|
|
require.NoError(t, err)
|
|
// Ensure all calls for Connection() return the same reference
|
|
// of the gRPC connection.
|
|
require.True(t, conn2 == conn)
|
|
}()
|
|
}
|
|
// Wait for the Lookup() to be invoked by both
|
|
// goroutines
|
|
<-spy.lookupInvoked
|
|
<-spy.lookupInvoked
|
|
// Signal the goroutines to finish the Lookup() invocations
|
|
spy.lookupDelay <- struct{}{}
|
|
spy.lookupDelay <- struct{}{}
|
|
// Close the channel so that subsequent Lookup() operations won't be blocked
|
|
close(spy.lookupDelay)
|
|
// Wait for all goroutines to exit
|
|
goroutinesExited.Wait()
|
|
}
|