/* Copyright IBM Corp. 2017 All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ package cluster_test import ( "context" "io" "strings" "testing" "time" "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/metrics/disabled" "github.com/hyperledger/fabric/internal/pkg/comm" "github.com/hyperledger/fabric/orderer/common/cluster" "github.com/hyperledger/fabric/orderer/common/cluster/mocks" "github.com/pkg/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) var ( submitRequest1 = &orderer.StepRequest{ Payload: &orderer.StepRequest_SubmitRequest{ SubmitRequest: &orderer.SubmitRequest{}, }, } submitRequest2 = &orderer.StepRequest{ Payload: &orderer.StepRequest_SubmitRequest{ SubmitRequest: &orderer.SubmitRequest{}, }, } submitResponse1 = &orderer.StepResponse{ Payload: &orderer.StepResponse_SubmitRes{ SubmitRes: &orderer.SubmitResponse{}, }, } consensusRequest = &orderer.StepRequest{ Payload: &orderer.StepRequest_ConsensusRequest{ ConsensusRequest: &orderer.ConsensusRequest{ Payload: []byte{1, 2, 3}, Channel: "mychannel", }, }, } ) func TestStep(t *testing.T) { dispatcher := &mocks.Dispatcher{} svc := &cluster.Service{ StreamCountReporter: &cluster.StreamCountReporter{ Metrics: cluster.NewMetrics(&disabled.Provider{}), }, Logger: flogging.MustGetLogger("test"), StepLogger: flogging.MustGetLogger("test"), Dispatcher: dispatcher, } t.Run("Success", func(t *testing.T) { stream := &mocks.StepStream{} stream.On("Context").Return(context.Background()) stream.On("Recv").Return(consensusRequest, nil).Once() stream.On("Recv").Return(consensusRequest, nil).Once() dispatcher.On("DispatchConsensus", mock.Anything, consensusRequest.GetConsensusRequest()).Return(nil).Once() dispatcher.On("DispatchConsensus", mock.Anything, consensusRequest.GetConsensusRequest()).Return(io.EOF).Once() err := svc.Step(stream) require.NoError(t, err) }) t.Run("Failure", func(t *testing.T) { stream := &mocks.StepStream{} stream.On("Context").Return(context.Background()) stream.On("Recv").Return(consensusRequest, nil).Once() dispatcher.On("DispatchConsensus", mock.Anything, consensusRequest.GetConsensusRequest()).Return(errors.New("oops")).Once() err := svc.Step(stream) require.EqualError(t, err, "oops") }) } func TestSubmitSuccess(t *testing.T) { dispatcher := &mocks.Dispatcher{} stream := &mocks.StepStream{} stream.On("Context").Return(context.Background()) // Send to the stream 2 messages, and afterwards close the stream stream.On("Recv").Return(submitRequest1, nil).Once() stream.On("Recv").Return(submitRequest2, nil).Once() stream.On("Recv").Return(nil, io.EOF).Once() // Send should be called for each corresponding receive stream.On("Send", submitResponse1).Return(nil).Twice() responses := make(chan *orderer.StepRequest, 2) responses <- submitRequest1 responses <- submitRequest2 dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Return(nil).Once() dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Return(nil).Once() // Ensure we pass requests to DispatchSubmit in-order dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { expectedRequest := <-responses actualRequest := args.Get(1).(*orderer.StepRequest) require.True(t, expectedRequest == actualRequest) }) svc := &cluster.Service{ StreamCountReporter: &cluster.StreamCountReporter{ Metrics: cluster.NewMetrics(&disabled.Provider{}), }, Logger: flogging.MustGetLogger("test"), StepLogger: flogging.MustGetLogger("test"), Dispatcher: dispatcher, } err := svc.Step(stream) require.NoError(t, err) dispatcher.AssertNumberOfCalls(t, "DispatchSubmit", 2) } type tuple struct { msg interface{} err error } func (t tuple) asArray() []interface{} { return []interface{}{t.msg, t.err} } func TestSubmitFailure(t *testing.T) { oops := errors.New("oops") testCases := []struct { name string receiveReturns []tuple sendReturns []error dispatchReturns error expectedDispatches int }{ { name: "Recv() fails", receiveReturns: []tuple{ {msg: nil, err: oops}, }, }, { name: "DispatchSubmit() fails", receiveReturns: []tuple{ {msg: submitRequest1}, }, expectedDispatches: 1, dispatchReturns: oops, }, } for _, testCase := range testCases { testCase := testCase t.Run(testCase.name, func(t *testing.T) { dispatcher := &mocks.Dispatcher{} stream := &mocks.StepStream{} stream.On("Context").Return(context.Background()) for _, recv := range testCase.receiveReturns { stream.On("Recv").Return(recv.asArray()...).Once() } for _, send := range testCase.sendReturns { stream.On("Send", mock.Anything).Return(send).Once() } defer dispatcher.AssertNumberOfCalls(t, "DispatchSubmit", testCase.expectedDispatches) dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Return(testCase.dispatchReturns) svc := &cluster.Service{ StreamCountReporter: &cluster.StreamCountReporter{ Metrics: cluster.NewMetrics(&disabled.Provider{}), }, Logger: flogging.MustGetLogger("test"), StepLogger: flogging.MustGetLogger("test"), Dispatcher: dispatcher, } err := svc.Step(stream) require.EqualError(t, err, oops.Error()) }) } } func TestIngresStreamsMetrics(t *testing.T) { dispatcher := &mocks.Dispatcher{} dispatcher.On("DispatchConsensus", mock.Anything, mock.Anything).Return(nil) fakeProvider := &mocks.MetricsProvider{} testMetrics := &testMetrics{ fakeProvider: fakeProvider, } testMetrics.initialize() metrics := cluster.NewMetrics(fakeProvider) svc := &cluster.Service{ Logger: flogging.MustGetLogger("test"), StepLogger: flogging.MustGetLogger("test"), Dispatcher: dispatcher, StreamCountReporter: &cluster.StreamCountReporter{ Metrics: metrics, }, } stream := &mocks.StepStream{} stream.On("Context").Return(context.Background()) // Upon first receive, return nil to proceed to the next receive. stream.On("Recv").Return(nil, nil).Once() // Upon the second receive, return EOF to trigger the stream to end stream.On("Recv").Return(nil, io.EOF).Once() svc.Step(stream) // The stream started so stream count incremented from 0 to 1 require.Equal(t, float64(1), testMetrics.ingressStreamsCount.SetArgsForCall(0)) // The stream ended so stream count is decremented from 1 to 0 require.Equal(t, float64(0), testMetrics.ingressStreamsCount.SetArgsForCall(1)) } func TestServiceGRPC(t *testing.T) { // Check that Service correctly implements the gRPC interface srv, err := comm.NewGRPCServer("127.0.0.1:0", comm.ServerConfig{}) require.NoError(t, err) orderer.RegisterClusterServer(srv.Server(), &cluster.Service{ Logger: flogging.MustGetLogger("test"), StepLogger: flogging.MustGetLogger("test"), }) } func TestExpirationWarningIngress(t *testing.T) { ca, err := tlsgen.NewCA() require.NoError(t, err) serverCert, err := ca.NewServerCertKeyPair("127.0.0.1") require.NoError(t, err) clientCert, err := ca.NewClientCertKeyPair() require.NoError(t, err) dispatcher := &mocks.Dispatcher{} dispatcher.On("DispatchConsensus", mock.Anything, mock.Anything).Return(nil) svc := &cluster.Service{ CertExpWarningThreshold: time.Until(clientCert.TLSCert.NotAfter), MinimumExpirationWarningInterval: time.Second * 2, StreamCountReporter: &cluster.StreamCountReporter{ Metrics: cluster.NewMetrics(&disabled.Provider{}), }, Logger: flogging.MustGetLogger("test"), StepLogger: flogging.MustGetLogger("test"), Dispatcher: dispatcher, } alerts := make(chan struct{}, 10) svc.Logger = svc.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error { if strings.Contains(entry.Message, "expires in less than 23h59m") { alerts <- struct{}{} } return nil })) srvConf := comm.ServerConfig{ SecOpts: comm.SecureOptions{ Certificate: serverCert.Cert, Key: serverCert.Key, UseTLS: true, ClientRootCAs: [][]byte{ca.CertBytes()}, RequireClientCert: true, }, } srv, err := comm.NewGRPCServer("127.0.0.1:0", srvConf) require.NoError(t, err) orderer.RegisterClusterServer(srv.Server(), svc) go srv.Start() defer srv.Stop() clientConf := comm.ClientConfig{ DialTimeout: time.Second * 3, SecOpts: comm.SecureOptions{ ServerRootCAs: [][]byte{ca.CertBytes()}, UseTLS: true, Key: clientCert.Key, Certificate: clientCert.Cert, RequireClientCert: true, }, } conn, err := clientConf.Dial(srv.Address()) require.NoError(t, err) cl := orderer.NewClusterClient(conn) stream, err := cl.Step(context.Background()) require.NoError(t, err) err = stream.Send(consensusRequest) require.NoError(t, err) // An alert is logged at the first time. select { case <-alerts: case <-time.After(time.Second * 5): t.Fatal("Should have received an alert") } err = stream.Send(consensusRequest) require.NoError(t, err) // No alerts in a consecutive time. select { case <-alerts: t.Fatal("Should have not received an alert") case <-time.After(time.Millisecond * 500): } // Wait for alert expiration interval to expire. time.Sleep(svc.MinimumExpirationWarningInterval + time.Second) err = stream.Send(consensusRequest) require.NoError(t, err) // An alert should be logged now after the timeout expired. select { case <-alerts: case <-time.After(time.Second * 5): t.Fatal("Should have received an alert") } }