go_study/fabric-main/orderer/common/cluster/commauth_test.go

531 lines
16 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster_test
import (
"crypto"
"crypto/rand"
"crypto/sha256"
"fmt"
"net"
"sync"
"testing"
"time"
crypto2 "github.com/hyperledger/fabric/common/crypto"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
comm_utils "github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type clusterServiceNode struct {
nodeInfo cluster.RemoteNode
bindAddress string
server *comm_utils.GRPCServer
cli *cluster.AuthCommMgr
service *cluster.ClusterService
handler *mocks.Handler
serverConfig comm_utils.ServerConfig
}
func (csn *clusterServiceNode) resurrect() {
gRPCServer, err := comm_utils.NewGRPCServer(csn.bindAddress, csn.serverConfig)
if err != nil {
panic(fmt.Errorf("failed starting gRPC server: %v", err))
}
csn.server = gRPCServer
orderer.RegisterClusterNodeServiceServer(gRPCServer.Server(), csn.service)
go csn.server.Start()
}
type signingIdentity struct {
crypto.Signer
}
func (si *signingIdentity) Sign(msg []byte) ([]byte, error) {
digest := sha256.Sum256(msg)
return si.Signer.Sign(rand.Reader, digest[:], nil)
}
func newClusterServiceNode(t *testing.T) *clusterServiceNode {
serverKeyPair, err := ca.NewServerCertKeyPair("127.0.0.1")
require.NoError(t, err)
serverKeyPair.Cert, err = crypto2.SanitizeX509Cert(serverKeyPair.Cert)
require.NoError(t, err)
srvConfig := comm_utils.ServerConfig{
SecOpts: comm_utils.SecureOptions{
Key: serverKeyPair.Key,
Certificate: serverKeyPair.Cert,
UseTLS: true,
},
}
gRPCServer, err := comm_utils.NewGRPCServer("127.0.0.1:", srvConfig)
require.NoError(t, err)
handler := &mocks.Handler{}
tstSrv := &cluster.ClusterService{
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
RequestHandler: handler,
MembershipByChannel: make(map[string]*cluster.ChannelMembersConfig),
NodeIdentity: serverKeyPair.Cert,
}
orderer.RegisterClusterNodeServiceServer(gRPCServer.Server(), tstSrv)
go gRPCServer.Start()
clientConfig := comm_utils.ClientConfig{
AsyncConnect: true,
DialTimeout: time.Hour,
SecOpts: comm_utils.SecureOptions{
ServerRootCAs: [][]byte{ca.CertBytes()},
UseTLS: true,
ClientRootCAs: [][]byte{ca.CertBytes()},
},
}
clientKeyPair, err := ca.NewClientCertKeyPair()
require.NoError(t, err)
clientKeyPair.Cert, err = crypto2.SanitizeX509Cert(clientKeyPair.Cert)
require.NoError(t, err)
if err != nil {
panic(fmt.Errorf("failed creating client certificate %v", err))
}
cli := &cluster.AuthCommMgr{
SendBufferSize: 1,
Logger: flogging.MustGetLogger("test"),
Chan2Members: make(cluster.MembersByChannel),
Connections: cluster.NewConnectionMgr(clientConfig),
Metrics: cluster.NewMetrics(&disabled.Provider{}),
Signer: &signingIdentity{serverKeyPair.Signer},
NodeIdentity: clientKeyPair.Cert,
}
node := &clusterServiceNode{
server: gRPCServer,
service: tstSrv,
cli: cli,
nodeInfo: cluster.RemoteNode{
NodeAddress: cluster.NodeAddress{
Endpoint: gRPCServer.Address(),
ID: nextUnusedID(),
},
NodeCerts: cluster.NodeCerts{
ServerRootCA: [][]byte{ca.CertBytes()},
Identity: clientKeyPair.Cert,
},
},
handler: handler,
serverConfig: srvConfig,
bindAddress: gRPCServer.Address(),
}
return node
}
func TestCommServiceBasicAuthentication(t *testing.T) {
// Scenario: Basic test that spawns 2 nodes and sends each other
// messages that are expected to be echoed back
t.Parallel()
node1 := newClusterServiceNode(t)
defer node1.server.Stop()
defer node1.cli.Shutdown()
node2 := newClusterServiceNode(t)
defer node2.server.Stop()
defer node2.cli.Shutdown()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.cli.Configure(testChannel, config)
node2.cli.Configure(testChannel, config)
node1.service.ConfigureNodeCerts(testChannel, []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity}})
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity}})
assertBiDiCommunicationForChannelWithSigner(t, node1, node2, testReq, testChannel)
}
func TestCommServiceMultiChannelAuth(t *testing.T) {
// Scenario: node 1 knows node 2 only in channel "foo"
// and knows node 3 only in channel "bar".
// Messages that are received, are routed according to their corresponding channels
t.Parallel()
node1 := newClusterServiceNode(t)
node2 := newClusterServiceNode(t)
node3 := newClusterServiceNode(t)
defer node1.server.Stop()
defer node2.server.Stop()
defer node3.server.Stop()
defer node1.cli.Shutdown()
defer node2.cli.Shutdown()
defer node3.cli.Shutdown()
consenterSet1 := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
consenterSet2 := []cluster.RemoteNode{node1.nodeInfo, node3.nodeInfo}
node1.cli.Configure("foo", consenterSet1)
node1.cli.Configure("bar", consenterSet2)
node2.cli.Configure("foo", consenterSet1)
node3.cli.Configure("bar", consenterSet2)
var fromNode2 sync.WaitGroup
fromNode2.Add(1)
node1.handler.On("OnSubmit", "foo", node2.nodeInfo.ID, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
fromNode2.Done()
}).Once()
var fromNode3 sync.WaitGroup
fromNode3.Add(1)
node1.handler.On("OnSubmit", "bar", node3.nodeInfo.ID, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
fromNode3.Done()
}).Once()
node1.service.ConfigureNodeCerts("foo", []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity}})
node1.service.ConfigureNodeCerts("bar", []*common.Consenter{{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity}, {Id: uint32(node3.nodeInfo.ID), Identity: node3.service.NodeIdentity}})
node2toNode1, err := node2.cli.Remote("foo", node1.nodeInfo.ID)
require.NoError(t, err)
node3toNode1, err := node3.cli.Remote("bar", node1.nodeInfo.ID)
require.NoError(t, err)
stream := assertEventualEstablishStreamWithSigner(t, node2toNode1)
stream.Send(fooReq)
fromNode2.Wait()
node1.handler.AssertNumberOfCalls(t, "OnSubmit", 1)
stream = assertEventualEstablishStreamWithSigner(t, node3toNode1)
stream.Send(barReq)
fromNode3.Wait()
node1.handler.AssertNumberOfCalls(t, "OnSubmit", 2)
}
func TestCommServiceMembershipReconfigurationAuth(t *testing.T) {
// Scenario: node 1 and node 2 are started up
// and node 2 is configured to know about node 1,
// without node1 knowing about node 2.
// The communication between them should only work
// after node 1 is configured to know about node 2.
t.Parallel()
node1 := newClusterServiceNode(t)
node2 := newClusterServiceNode(t)
defer node1.server.Stop()
defer node2.server.Stop()
defer node1.cli.Shutdown()
defer node2.cli.Shutdown()
// node2 is not part of the node1's testChannel consenters set
node1.cli.Configure(testChannel, []cluster.RemoteNode{})
node1.service.ConfigureNodeCerts(testChannel, []*common.Consenter{})
consenterSet1 := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node2.cli.Configure(testChannel, consenterSet1)
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
})
// Node 1 can't connect to node 2 because it doesn't know its TLS certificate yet
_, err := node1.cli.Remote(testChannel, node2.nodeInfo.ID)
require.EqualError(t, err, fmt.Sprintf("node %d doesn't exist in channel test's membership", node2.nodeInfo.ID))
// Node 2 can connect to node 1, but it can't send messages because node 1 doesn't know node 2 yet.
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() (bool, error) {
_, err := node2.cli.Remote(testChannel, node1.nodeInfo.ID)
return true, err
}, time.Minute).Should(gomega.BeTrue())
stub, err := node2.cli.Remote(testChannel, node1.nodeInfo.ID)
require.NoError(t, err)
// node2 is able to establish the connection with node1 and
// create the client stream but send failed during the authentication
// which is known to client stream when it calls recv
stream := assertEventualEstablishStreamWithSigner(t, stub)
err = stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unauthenticated desc = access denied")
// Next, configure node 1 to know about node 2
node1.cli.Configure(testChannel, consenterSet1)
node1.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
})
// Check that the communication works correctly between both nodes
assertBiDiCommunicationForChannelWithSigner(t, node1, node2, testReq, testChannel)
// Reconfigure node 2 to forget about node 1
node2.cli.Configure(testChannel, []cluster.RemoteNode{})
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{})
// Node 1 can still connect to node 2
stub, err = node1.cli.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
// But can't send a message because node 2 now doesn't authorized node 1
stream = assertEventualEstablishStreamWithSigner(t, stub)
stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unauthenticated desc = access denied")
// Reconfigure node 2 to know about node 1
node2.cli.Configure(testChannel, consenterSet1)
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
})
// Node 1 can still connect to node 2
stub, err = node1.cli.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
// node 1 can now send a message to node 2 now
stream = assertEventualEstablishStreamWithSigner(t, stub)
var wg sync.WaitGroup
wg.Add(1)
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
wg.Done()
})
stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
wg.Wait()
// Reconfigure the node 2 consenters set while the stream is active
// stream should not be affected as the node 1 still part of the consenter set
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
{Id: uint32(node1.nodeInfo.ID), Identity: node1.service.NodeIdentity},
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
})
wg.Add(1)
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
wg.Done()
})
err = stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
wg.Wait()
// Reconfigure the node 2 consenters set removing the node1
// now the stream marked as stale
node2.service.ConfigureNodeCerts(testChannel, []*common.Consenter{
{Id: uint32(node2.nodeInfo.ID), Identity: node2.service.NodeIdentity},
})
err = stream.Send(wrapSubmitReq(testSubReq))
require.NoError(t, err)
_, err = stream.Recv()
require.EqualError(t, err, "rpc error: code = Unknown desc = stream 2 is stale")
}
func TestCommServiceReconnectAuth(t *testing.T) {
// Scenario: node 1 and node 2 are connected,
// and node 2 is taken offline.
// Node 1 tries to send a message to node 2 but fails,
// and afterwards node 2 is brought back, after which
// node 1 sends more messages, and it should succeed
// sending a message to node 2 eventually.
t.Parallel()
node1 := newClusterServiceNode(t)
node2 := newClusterServiceNode(t)
defer node1.server.Stop()
defer node2.server.Stop()
defer node1.cli.Shutdown()
defer node2.cli.Shutdown()
node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(nil)
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.cli.Configure(testChannel, config)
node2.cli.Configure(testChannel, config)
// Make node 2 be offline by shutting down its gRPC service
node2.server.Stop()
// Obtain the stub for node 2.
// Should succeed, because the connection was created at time of configuration
stub, err := node1.cli.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)
// Try to obtain a stream. Should not Succeed.
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
_, err = stub.NewStream(time.Hour)
return err
}).Should(gomega.Not(gomega.Succeed()))
// Wait for the port to be released
for {
lsnr, err := net.Listen("tcp", node2.nodeInfo.Endpoint)
if err == nil {
lsnr.Close()
break
}
}
// Resurrect node 2
node2.resurrect()
// Send a message from node 1 to node 2.
// Should succeed eventually
assertEventualSendMessageWithSigner(t, stub, testReq)
}
func TestBuildStepRespone(t *testing.T) {
t.Parallel()
tcs := []struct {
name string
input *orderer.ClusterNodeServiceStepResponse
res *orderer.StepResponse
err string
}{
{
name: "input is nil",
input: nil,
res: nil,
err: "input response object is nil",
},
{
name: "complete node step response",
input: &orderer.ClusterNodeServiceStepResponse{
Payload: &orderer.ClusterNodeServiceStepResponse_TranorderRes{
TranorderRes: &orderer.TransactionOrderResponse{
Channel: "test",
Status: common.Status_SUCCESS,
},
},
},
res: &orderer.StepResponse{
Payload: &orderer.StepResponse_SubmitRes{
SubmitRes: &orderer.SubmitResponse{
Channel: "test",
Status: common.Status_SUCCESS,
},
},
},
err: "",
},
{
name: "input invalid object",
input: &orderer.ClusterNodeServiceStepResponse{
Payload: nil,
},
res: nil,
err: "service stream returned with invalid response type",
},
}
for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
res, err := cluster.BuildStepRespone(tc.input)
if err != nil {
require.EqualError(t, err, tc.err)
} else {
require.EqualValues(t, tc.res, res)
}
})
}
}
func assertEventualSendMessageWithSigner(t *testing.T, rpc *cluster.RemoteContext, req *orderer.SubmitRequest) *cluster.Stream {
var res *cluster.Stream
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
stream, err := rpc.NewStream(time.Hour)
if err != nil {
return err
}
res = stream
return stream.Send(wrapSubmitReq(req))
}, timeout).Should(gomega.Succeed())
return res
}
func assertEventualEstablishStreamWithSigner(t *testing.T, rpc *cluster.RemoteContext) *cluster.Stream {
var res *cluster.Stream
gt := gomega.NewGomegaWithT(t)
gt.Eventually(func() error {
stream, err := rpc.NewStream(time.Hour)
res = stream
return err
}, timeout).Should(gomega.Succeed())
return res
}
func assertBiDiCommunicationForChannelWithSigner(t *testing.T, node1, node2 *clusterServiceNode, msgToSend *orderer.SubmitRequest, channel string) {
establish := []struct {
label string
sender *clusterServiceNode
receiver *clusterServiceNode
target uint64
}{
{label: "1->2", sender: node1, target: node2.nodeInfo.ID, receiver: node2},
{label: "2->1", sender: node2, target: node1.nodeInfo.ID, receiver: node1},
}
for _, estab := range establish {
stub, err := estab.sender.cli.Remote(channel, estab.target)
require.NoError(t, err)
stream := assertEventualEstablishStreamWithSigner(t, stub)
var wg sync.WaitGroup
wg.Add(1)
estab.receiver.handler.On("OnSubmit", channel, estab.sender.nodeInfo.ID, mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
req := args.Get(2).(*orderer.SubmitRequest)
require.True(t, proto.Equal(req, msgToSend))
t.Log(estab.label)
wg.Done()
})
err = stream.Send(wrapSubmitReq(msgToSend))
require.NoError(t, err)
wg.Wait()
}
}