531 lines
16 KiB
Go
531 lines
16 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package cluster_test
|
|
|
|
import (
|
|
"crypto"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
crypto2 "github.com/hyperledger/fabric/common/crypto"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/hyperledger/fabric-protos-go/common"
|
|
"github.com/hyperledger/fabric-protos-go/orderer"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/hyperledger/fabric/common/metrics/disabled"
|
|
comm_utils "github.com/hyperledger/fabric/internal/pkg/comm"
|
|
"github.com/hyperledger/fabric/orderer/common/cluster"
|
|
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
|
|
"github.com/onsi/gomega"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type clusterServiceNode struct {
|
|
nodeInfo cluster.RemoteNode
|
|
bindAddress string
|
|
server *comm_utils.GRPCServer
|
|
cli *cluster.AuthCommMgr
|
|
service *cluster.ClusterService
|
|
handler *mocks.Handler
|
|
serverConfig comm_utils.ServerConfig
|
|
}
|
|
|
|
func (csn *clusterServiceNode) resurrect() {
|
|
gRPCServer, err := comm_utils.NewGRPCServer(csn.bindAddress, csn.serverConfig)
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed starting gRPC server: %v", err))
|
|
}
|
|
csn.server = gRPCServer
|
|
orderer.RegisterClusterNodeServiceServer(gRPCServer.Server(), csn.service)
|
|
go csn.server.Start()
|
|
}
|
|
|
|
type signingIdentity struct {
|
|
crypto.Signer
|
|
}
|
|
|
|
func (si *signingIdentity) Sign(msg []byte) ([]byte, error) {
|
|
digest := sha256.Sum256(msg)
|
|
return si.Signer.Sign(rand.Reader, digest[:], nil)
|
|
}
|
|
|
|
func newClusterServiceNode(t *testing.T) *clusterServiceNode {
|
|
serverKeyPair, err := ca.NewServerCertKeyPair("127.0.0.1")
|
|
require.NoError(t, err)
|
|
|
|
serverKeyPair.Cert, err = crypto2.SanitizeX509Cert(serverKeyPair.Cert)
|
|
require.NoError(t, err)
|
|
|
|
srvConfig := comm_utils.ServerConfig{
|
|
SecOpts: comm_utils.SecureOptions{
|
|
Key: serverKeyPair.Key,
|
|
Certificate: serverKeyPair.Cert,
|
|
UseTLS: true,
|
|
},
|
|
}
|
|
gRPCServer, err := comm_utils.NewGRPCServer("127.0.0.1:", srvConfig)
|
|
require.NoError(t, err)
|
|
|
|
handler := &mocks.Handler{}
|
|
|
|
tstSrv := &cluster.ClusterService{
|
|
StreamCountReporter: &cluster.StreamCountReporter{
|
|
Metrics: cluster.NewMetrics(&disabled.Provider{}),
|
|
},
|
|
Logger: flogging.MustGetLogger("test"),
|
|
StepLogger: flogging.MustGetLogger("test"),
|
|
RequestHandler: handler,
|
|
MembershipByChannel: make(map[string]*cluster.ChannelMembersConfig),
|
|
NodeIdentity: serverKeyPair.Cert,
|
|
}
|
|
|
|
orderer.RegisterClusterNodeServiceServer(gRPCServer.Server(), tstSrv)
|
|
go gRPCServer.Start()
|
|
|
|
clientConfig := comm_utils.ClientConfig{
|
|
AsyncConnect: true,
|
|
DialTimeout: time.Hour,
|
|
SecOpts: comm_utils.SecureOptions{
|
|
ServerRootCAs: [][]byte{ca.CertBytes()},
|
|
UseTLS: true,
|
|
ClientRootCAs: [][]byte{ca.CertBytes()},
|
|
},
|
|
}
|
|
|
|
clientKeyPair, err := ca.NewClientCertKeyPair()
|
|
require.NoError(t, err)
|
|
|
|
clientKeyPair.Cert, err = crypto2.SanitizeX509Cert(clientKeyPair.Cert)
|
|
require.NoError(t, err)
|
|
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed creating client certificate %v", err))
|
|
}
|
|
cli := &cluster.AuthCommMgr{
|
|
SendBufferSize: 1,
|
|
Logger: flogging.MustGetLogger("test"),
|
|
Chan2Members: make(cluster.MembersByChannel),
|
|
Connections: cluster.NewConnectionMgr(clientConfig),
|
|
Metrics: cluster.NewMetrics(&disabled.Provider{}),
|
|
Signer: &signingIdentity{serverKeyPair.Signer},
|
|
NodeIdentity: clientKeyPair.Cert,
|
|
}
|
|
|
|
node := &clusterServiceNode{
|
|
server: gRPCServer,
|
|
service: tstSrv,
|
|
cli: cli,
|
|
nodeInfo: cluster.RemoteNode{
|
|
NodeAddress: cluster.NodeAddress{
|
|
Endpoint: gRPCServer.Address(),
|
|
ID: nextUnusedID(),
|
|
},
|
|
NodeCerts: cluster.NodeCerts{
|
|
ServerRootCA: [][]byte{ca.CertBytes()},
|
|
Identity: clientKeyPair.Cert,
|
|
},
|
|
},
|
|
handler: handler,
|
|
serverConfig: srvConfig,
|
|
bindAddress: gRPCServer.Address(),
|
|
}
|
|
|
|
return node
|
|
}
|
|
|
|
func TestCommServiceBasicAuthentication(t *testing.T) {
|
|
// Scenario: Basic test that spawns 2 nodes and sends each other
|
|
// messages that are expected to be echoed back
|
|
t.Parallel()
|
|
|
|
node1 := newClusterServiceNode(t)
|
|
defer node1.server.Stop()
|
|
defer node1.cli.Shutdown()
|
|
|
|
node2 := newClusterServiceNode(t)
|
|
defer node2.server.Stop()
|
|
defer node2.cli.Shutdown()
|
|
|
|
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
|
|
|
|
node1.cli.Configure(testChannel, config)
|
|
node2.cli.Configure(testChannel, config)
|
|
|
|
node1.service.ConfigureNodeCerts(testChannel, []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity}})
|
|
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity}})
|
|
|
|
assertBiDiCommunicationForChannelWithSigner(t, node1, node2, testReq, testChannel)
|
|
}
|
|
|
|
func TestCommServiceMultiChannelAuth(t *testing.T) {
|
|
// Scenario: node 1 knows node 2 only in channel "foo"
|
|
// and knows node 3 only in channel "bar".
|
|
// Messages that are received, are routed according to their corresponding channels
|
|
t.Parallel()
|
|
|
|
node1 := newClusterServiceNode(t)
|
|
node2 := newClusterServiceNode(t)
|
|
node3 := newClusterServiceNode(t)
|
|
|
|
defer node1.server.Stop()
|
|
defer node2.server.Stop()
|
|
defer node3.server.Stop()
|
|
defer node1.cli.Shutdown()
|
|
defer node2.cli.Shutdown()
|
|
defer node3.cli.Shutdown()
|
|
|
|
consenterSet1 := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
|
|
consenterSet2 := []cluster.RemoteNode{node1.nodeInfo, node3.nodeInfo}
|
|
|
|
node1.cli.Configure("foo", consenterSet1)
|
|
node1.cli.Configure("bar", consenterSet2)
|
|
node2.cli.Configure("foo", consenterSet1)
|
|
node3.cli.Configure("bar", consenterSet2)
|
|
|
|
var fromNode2 sync.WaitGroup
|
|
fromNode2.Add(1)
|
|
node1.handler.On("OnSubmit", "foo", node2.nodeInfo.ID, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
|
|
fromNode2.Done()
|
|
}).Once()
|
|
|
|
var fromNode3 sync.WaitGroup
|
|
fromNode3.Add(1)
|
|
node1.handler.On("OnSubmit", "bar", node3.nodeInfo.ID, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
|
|
fromNode3.Done()
|
|
}).Once()
|
|
|
|
node1.service.ConfigureNodeCerts("foo", []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity}})
|
|
node1.service.ConfigureNodeCerts("bar", []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node3.nodeInfo.ID), Identity: node3.service.NodeIdentity}})
|
|
|
|
node2toNode1, err := node2.cli.Remote("foo", node1.nodeInfo.ID)
|
|
require.NoError(t, err)
|
|
|
|
node3toNode1, err := node3.cli.Remote("bar", node1.nodeInfo.ID)
|
|
require.NoError(t, err)
|
|
|
|
stream := assertEventualEstablishStreamWithSigner(t, node2toNode1)
|
|
stream.Send(fooReq)
|
|
|
|
fromNode2.Wait()
|
|
node1.handler.AssertNumberOfCalls(t, "OnSubmit", 1)
|
|
|
|
stream = assertEventualEstablishStreamWithSigner(t, node3toNode1)
|
|
stream.Send(barReq)
|
|
|
|
fromNode3.Wait()
|
|
node1.handler.AssertNumberOfCalls(t, "OnSubmit", 2)
|
|
}
|
|
|
|
func TestCommServiceMembershipReconfigurationAuth(t *testing.T) {
|
|
// Scenario: node 1 and node 2 are started up
|
|
// and node 2 is configured to know about node 1,
|
|
// without node1 knowing about node 2.
|
|
// The communication between them should only work
|
|
// after node 1 is configured to know about node 2.
|
|
t.Parallel()
|
|
|
|
node1 := newClusterServiceNode(t)
|
|
node2 := newClusterServiceNode(t)
|
|
|
|
defer node1.server.Stop()
|
|
defer node2.server.Stop()
|
|
|
|
defer node1.cli.Shutdown()
|
|
defer node2.cli.Shutdown()
|
|
|
|
// node2 is not part of the node1's testChannel consenters set
|
|
node1.cli.Configure(testChannel, []cluster.RemoteNode{})
|
|
node1.service.ConfigureNodeCerts(testChannel, []*common.Consenter{})
|
|
|
|
consenterSet1 := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
|
|
node2.cli.Configure(testChannel, consenterSet1)
|
|
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
|
|
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
|
|
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
|
|
})
|
|
|
|
// Node 1 can't connect to node 2 because it doesn't know its TLS certificate yet
|
|
_, err := node1.cli.Remote(testChannel, node2.nodeInfo.ID)
|
|
require.EqualError(t, err, fmt.Sprintf("node %d doesn't exist in channel test's membership", node2.nodeInfo.ID))
|
|
|
|
// Node 2 can connect to node 1, but it can't send messages because node 1 doesn't know node 2 yet.
|
|
gt := gomega.NewGomegaWithT(t)
|
|
gt.Eventually(func() (bool, error) {
|
|
_, err := node2.cli.Remote(testChannel, node1.nodeInfo.ID)
|
|
return true, err
|
|
}, time.Minute).Should(gomega.BeTrue())
|
|
|
|
stub, err := node2.cli.Remote(testChannel, node1.nodeInfo.ID)
|
|
require.NoError(t, err)
|
|
|
|
// node2 is able to establish the connection with node1 and
|
|
// create the client stream but send failed during the authentication
|
|
// which is known to client stream when it calls recv
|
|
stream := assertEventualEstablishStreamWithSigner(t, stub)
|
|
err = stream.Send(wrapSubmitReq(testSubReq))
|
|
require.NoError(t, err)
|
|
|
|
_, err = stream.Recv()
|
|
require.EqualError(t, err, "rpc error: code = Unauthenticated desc = access denied")
|
|
|
|
// Next, configure node 1 to know about node 2
|
|
node1.cli.Configure(testChannel, consenterSet1)
|
|
node1.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
|
|
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
|
|
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
|
|
})
|
|
|
|
// Check that the communication works correctly between both nodes
|
|
assertBiDiCommunicationForChannelWithSigner(t, node1, node2, testReq, testChannel)
|
|
|
|
// Reconfigure node 2 to forget about node 1
|
|
node2.cli.Configure(testChannel, []cluster.RemoteNode{})
|
|
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{})
|
|
|
|
// Node 1 can still connect to node 2
|
|
stub, err = node1.cli.Remote(testChannel, node2.nodeInfo.ID)
|
|
require.NoError(t, err)
|
|
|
|
// But can't send a message because node 2 now doesn't authorized node 1
|
|
stream = assertEventualEstablishStreamWithSigner(t, stub)
|
|
stream.Send(wrapSubmitReq(testSubReq))
|
|
require.NoError(t, err)
|
|
|
|
_, err = stream.Recv()
|
|
require.EqualError(t, err, "rpc error: code = Unauthenticated desc = access denied")
|
|
|
|
// Reconfigure node 2 to know about node 1
|
|
node2.cli.Configure(testChannel, consenterSet1)
|
|
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
|
|
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
|
|
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
|
|
})
|
|
|
|
// Node 1 can still connect to node 2
|
|
stub, err = node1.cli.Remote(testChannel, node2.nodeInfo.ID)
|
|
require.NoError(t, err)
|
|
|
|
// node 1 can now send a message to node 2 now
|
|
stream = assertEventualEstablishStreamWithSigner(t, stub)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
|
|
wg.Done()
|
|
})
|
|
|
|
stream.Send(wrapSubmitReq(testSubReq))
|
|
require.NoError(t, err)
|
|
|
|
wg.Wait()
|
|
|
|
// Reconfigure the node 2 consenters set while the stream is active
|
|
// stream should not be affected as the node 1 still part of the consenter set
|
|
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
|
|
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
|
|
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
|
|
})
|
|
|
|
wg.Add(1)
|
|
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
|
|
wg.Done()
|
|
})
|
|
|
|
err = stream.Send(wrapSubmitReq(testSubReq))
|
|
require.NoError(t, err)
|
|
|
|
wg.Wait()
|
|
|
|
// Reconfigure the node 2 consenters set removing the node1
|
|
// now the stream marked as stale
|
|
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
|
|
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
|
|
})
|
|
|
|
err = stream.Send(wrapSubmitReq(testSubReq))
|
|
require.NoError(t, err)
|
|
|
|
_, err = stream.Recv()
|
|
require.EqualError(t, err, "rpc error: code = Unknown desc = stream 2 is stale")
|
|
}
|
|
|
|
func TestCommServiceReconnectAuth(t *testing.T) {
|
|
// Scenario: node 1 and node 2 are connected,
|
|
// and node 2 is taken offline.
|
|
// Node 1 tries to send a message to node 2 but fails,
|
|
// and afterwards node 2 is brought back, after which
|
|
// node 1 sends more messages, and it should succeed
|
|
// sending a message to node 2 eventually.
|
|
t.Parallel()
|
|
|
|
node1 := newClusterServiceNode(t)
|
|
node2 := newClusterServiceNode(t)
|
|
|
|
defer node1.server.Stop()
|
|
defer node2.server.Stop()
|
|
|
|
defer node1.cli.Shutdown()
|
|
defer node2.cli.Shutdown()
|
|
|
|
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil)
|
|
|
|
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
|
|
node1.cli.Configure(testChannel, config)
|
|
node2.cli.Configure(testChannel, config)
|
|
|
|
// Make node 2 be offline by shutting down its gRPC service
|
|
node2.server.Stop()
|
|
// Obtain the stub for node 2.
|
|
// Should succeed, because the connection was created at time of configuration
|
|
stub, err := node1.cli.Remote(testChannel, node2.nodeInfo.ID)
|
|
require.NoError(t, err)
|
|
|
|
// Try to obtain a stream. Should not Succeed.
|
|
gt := gomega.NewGomegaWithT(t)
|
|
gt.Eventually(func() error {
|
|
_, err = stub.NewStream(time.Hour)
|
|
return err
|
|
}).Should(gomega.Not(gomega.Succeed()))
|
|
|
|
// Wait for the port to be released
|
|
for {
|
|
lsnr, err := net.Listen("tcp", node2.nodeInfo.Endpoint)
|
|
if err == nil {
|
|
lsnr.Close()
|
|
break
|
|
}
|
|
}
|
|
|
|
// Resurrect node 2
|
|
node2.resurrect()
|
|
// Send a message from node 1 to node 2.
|
|
// Should succeed eventually
|
|
assertEventualSendMessageWithSigner(t, stub, testReq)
|
|
}
|
|
|
|
func TestBuildStepRespone(t *testing.T) {
|
|
t.Parallel()
|
|
tcs := []struct {
|
|
name string
|
|
input *orderer.ClusterNodeServiceStepResponse
|
|
res *orderer.StepResponse
|
|
err string
|
|
}{
|
|
{
|
|
name: "input is nil",
|
|
input: nil,
|
|
res: nil,
|
|
err: "input response object is nil",
|
|
},
|
|
{
|
|
name: "complete node step response",
|
|
input: &orderer.ClusterNodeServiceStepResponse{
|
|
Payload: &orderer.ClusterNodeServiceStepResponse_TranorderRes{
|
|
TranorderRes: &orderer.TransactionOrderResponse{
|
|
Channel: "test",
|
|
Status: common.Status_SUCCESS,
|
|
},
|
|
},
|
|
},
|
|
res: &orderer.StepResponse{
|
|
Payload: &orderer.StepResponse_SubmitRes{
|
|
SubmitRes: &orderer.SubmitResponse{
|
|
Channel: "test",
|
|
Status: common.Status_SUCCESS,
|
|
},
|
|
},
|
|
},
|
|
err: "",
|
|
},
|
|
{
|
|
name: "input invalid object",
|
|
input: &orderer.ClusterNodeServiceStepResponse{
|
|
Payload: nil,
|
|
},
|
|
res: nil,
|
|
err: "service stream returned with invalid response type",
|
|
},
|
|
}
|
|
for _, tc := range tcs {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
res, err := cluster.BuildStepRespone(tc.input)
|
|
if err != nil {
|
|
require.EqualError(t, err, tc.err)
|
|
} else {
|
|
require.EqualValues(t, tc.res, res)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func assertEventualSendMessageWithSigner(t *testing.T, rpc *cluster.RemoteContext, req *orderer.SubmitRequest) *cluster.Stream {
|
|
var res *cluster.Stream
|
|
gt := gomega.NewGomegaWithT(t)
|
|
gt.Eventually(func() error {
|
|
stream, err := rpc.NewStream(time.Hour)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
res = stream
|
|
return stream.Send(wrapSubmitReq(req))
|
|
}, timeout).Should(gomega.Succeed())
|
|
return res
|
|
}
|
|
|
|
func assertEventualEstablishStreamWithSigner(t *testing.T, rpc *cluster.RemoteContext) *cluster.Stream {
|
|
var res *cluster.Stream
|
|
gt := gomega.NewGomegaWithT(t)
|
|
gt.Eventually(func() error {
|
|
stream, err := rpc.NewStream(time.Hour)
|
|
res = stream
|
|
return err
|
|
}, timeout).Should(gomega.Succeed())
|
|
return res
|
|
}
|
|
|
|
func assertBiDiCommunicationForChannelWithSigner(t *testing.T, node1, node2 *clusterServiceNode, msgToSend *orderer.SubmitRequest, channel string) {
|
|
establish := []struct {
|
|
label string
|
|
sender *clusterServiceNode
|
|
receiver *clusterServiceNode
|
|
target uint64
|
|
}{
|
|
{label: "1->2", sender: node1, target: node2.nodeInfo.ID, receiver: node2},
|
|
{label: "2->1", sender: node2, target: node1.nodeInfo.ID, receiver: node1},
|
|
}
|
|
for _, estab := range establish {
|
|
stub, err := estab.sender.cli.Remote(channel, estab.target)
|
|
require.NoError(t, err)
|
|
|
|
stream := assertEventualEstablishStreamWithSigner(t, stub)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
|
|
estab.receiver.handler.On("OnSubmit", channel, estab.sender.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
|
|
req := args.Get(2).(*orderer.SubmitRequest)
|
|
require.True(t, proto.Equal(req, msgToSend))
|
|
t.Log(estab.label)
|
|
wg.Done()
|
|
})
|
|
|
|
err = stream.Send(wrapSubmitReq(msgToSend))
|
|
require.NoError(t, err)
|
|
|
|
wg.Wait()
|
|
}
|
|
}
|