go_study/fabric-main/orderer/common/cluster/service_test.go

334 lines
9.6 KiB
Go

/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster_test
import (
"context"
"io"
"strings"
"testing"
"time"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
submitRequest1 = &orderer.StepRequest{
Payload: &orderer.StepRequest_SubmitRequest{
SubmitRequest: &orderer.SubmitRequest{},
},
}
submitRequest2 = &orderer.StepRequest{
Payload: &orderer.StepRequest_SubmitRequest{
SubmitRequest: &orderer.SubmitRequest{},
},
}
submitResponse1 = &orderer.StepResponse{
Payload: &orderer.StepResponse_SubmitRes{
SubmitRes: &orderer.SubmitResponse{},
},
}
consensusRequest = &orderer.StepRequest{
Payload: &orderer.StepRequest_ConsensusRequest{
ConsensusRequest: &orderer.ConsensusRequest{
Payload: []byte{1, 2, 3},
Channel: "mychannel",
},
},
}
)
func TestStep(t *testing.T) {
dispatcher := &mocks.Dispatcher{}
svc := &cluster.Service{
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
Dispatcher: dispatcher,
}
t.Run("Success", func(t *testing.T) {
stream := &mocks.StepStream{}
stream.On("Context").Return(context.Background())
stream.On("Recv").Return(consensusRequest, nil).Once()
stream.On("Recv").Return(consensusRequest, nil).Once()
dispatcher.On("DispatchConsensus", mock.Anything, consensusRequest.GetConsensusRequest()).Return(nil).Once()
dispatcher.On("DispatchConsensus", mock.Anything, consensusRequest.GetConsensusRequest()).Return(io.EOF).Once()
err := svc.Step(stream)
require.NoError(t, err)
})
t.Run("Failure", func(t *testing.T) {
stream := &mocks.StepStream{}
stream.On("Context").Return(context.Background())
stream.On("Recv").Return(consensusRequest, nil).Once()
dispatcher.On("DispatchConsensus", mock.Anything, consensusRequest.GetConsensusRequest()).Return(errors.New("oops")).Once()
err := svc.Step(stream)
require.EqualError(t, err, "oops")
})
}
func TestSubmitSuccess(t *testing.T) {
dispatcher := &mocks.Dispatcher{}
stream := &mocks.StepStream{}
stream.On("Context").Return(context.Background())
// Send to the stream 2 messages, and afterwards close the stream
stream.On("Recv").Return(submitRequest1, nil).Once()
stream.On("Recv").Return(submitRequest2, nil).Once()
stream.On("Recv").Return(nil, io.EOF).Once()
// Send should be called for each corresponding receive
stream.On("Send", submitResponse1).Return(nil).Twice()
responses := make(chan *orderer.StepRequest, 2)
responses <- submitRequest1
responses <- submitRequest2
dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Return(nil).Once()
dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Return(nil).Once()
// Ensure we pass requests to DispatchSubmit in-order
dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
expectedRequest := <-responses
actualRequest := args.Get(1).(*orderer.StepRequest)
require.True(t, expectedRequest == actualRequest)
})
svc := &cluster.Service{
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
Dispatcher: dispatcher,
}
err := svc.Step(stream)
require.NoError(t, err)
dispatcher.AssertNumberOfCalls(t, "DispatchSubmit", 2)
}
type tuple struct {
msg interface{}
err error
}
func (t tuple) asArray() []interface{} {
return []interface{}{t.msg, t.err}
}
func TestSubmitFailure(t *testing.T) {
oops := errors.New("oops")
testCases := []struct {
name string
receiveReturns []tuple
sendReturns []error
dispatchReturns error
expectedDispatches int
}{
{
name: "Recv() fails",
receiveReturns: []tuple{
{msg: nil, err: oops},
},
},
{
name: "DispatchSubmit() fails",
receiveReturns: []tuple{
{msg: submitRequest1},
},
expectedDispatches: 1,
dispatchReturns: oops,
},
}
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
dispatcher := &mocks.Dispatcher{}
stream := &mocks.StepStream{}
stream.On("Context").Return(context.Background())
for _, recv := range testCase.receiveReturns {
stream.On("Recv").Return(recv.asArray()...).Once()
}
for _, send := range testCase.sendReturns {
stream.On("Send", mock.Anything).Return(send).Once()
}
defer dispatcher.AssertNumberOfCalls(t, "DispatchSubmit", testCase.expectedDispatches)
dispatcher.On("DispatchSubmit", mock.Anything, mock.Anything).Return(testCase.dispatchReturns)
svc := &cluster.Service{
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
Dispatcher: dispatcher,
}
err := svc.Step(stream)
require.EqualError(t, err, oops.Error())
})
}
}
func TestIngresStreamsMetrics(t *testing.T) {
dispatcher := &mocks.Dispatcher{}
dispatcher.On("DispatchConsensus", mock.Anything, mock.Anything).Return(nil)
fakeProvider := &mocks.MetricsProvider{}
testMetrics := &testMetrics{
fakeProvider: fakeProvider,
}
testMetrics.initialize()
metrics := cluster.NewMetrics(fakeProvider)
svc := &cluster.Service{
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
Dispatcher: dispatcher,
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: metrics,
},
}
stream := &mocks.StepStream{}
stream.On("Context").Return(context.Background())
// Upon first receive, return nil to proceed to the next receive.
stream.On("Recv").Return(nil, nil).Once()
// Upon the second receive, return EOF to trigger the stream to end
stream.On("Recv").Return(nil, io.EOF).Once()
svc.Step(stream)
// The stream started so stream count incremented from 0 to 1
require.Equal(t, float64(1), testMetrics.ingressStreamsCount.SetArgsForCall(0))
// The stream ended so stream count is decremented from 1 to 0
require.Equal(t, float64(0), testMetrics.ingressStreamsCount.SetArgsForCall(1))
}
func TestServiceGRPC(t *testing.T) {
// Check that Service correctly implements the gRPC interface
srv, err := comm.NewGRPCServer("127.0.0.1:0", comm.ServerConfig{})
require.NoError(t, err)
orderer.RegisterClusterServer(srv.Server(), &cluster.Service{
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
})
}
func TestExpirationWarningIngress(t *testing.T) {
ca, err := tlsgen.NewCA()
require.NoError(t, err)
serverCert, err := ca.NewServerCertKeyPair("127.0.0.1")
require.NoError(t, err)
clientCert, err := ca.NewClientCertKeyPair()
require.NoError(t, err)
dispatcher := &mocks.Dispatcher{}
dispatcher.On("DispatchConsensus", mock.Anything, mock.Anything).Return(nil)
svc := &cluster.Service{
CertExpWarningThreshold: time.Until(clientCert.TLSCert.NotAfter),
MinimumExpirationWarningInterval: time.Second * 2,
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
Dispatcher: dispatcher,
}
alerts := make(chan struct{}, 10)
svc.Logger = svc.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "expires in less than 23h59m") {
alerts <- struct{}{}
}
return nil
}))
srvConf := comm.ServerConfig{
SecOpts: comm.SecureOptions{
Certificate: serverCert.Cert,
Key: serverCert.Key,
UseTLS: true,
ClientRootCAs: [][]byte{ca.CertBytes()},
RequireClientCert: true,
},
}
srv, err := comm.NewGRPCServer("127.0.0.1:0", srvConf)
require.NoError(t, err)
orderer.RegisterClusterServer(srv.Server(), svc)
go srv.Start()
defer srv.Stop()
clientConf := comm.ClientConfig{
DialTimeout: time.Second * 3,
SecOpts: comm.SecureOptions{
ServerRootCAs: [][]byte{ca.CertBytes()},
UseTLS: true,
Key: clientCert.Key,
Certificate: clientCert.Cert,
RequireClientCert: true,
},
}
conn, err := clientConf.Dial(srv.Address())
require.NoError(t, err)
cl := orderer.NewClusterClient(conn)
stream, err := cl.Step(context.Background())
require.NoError(t, err)
err = stream.Send(consensusRequest)
require.NoError(t, err)
// An alert is logged at the first time.
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have received an alert")
}
err = stream.Send(consensusRequest)
require.NoError(t, err)
// No alerts in a consecutive time.
select {
case <-alerts:
t.Fatal("Should have not received an alert")
case <-time.After(time.Millisecond * 500):
}
// Wait for alert expiration interval to expire.
time.Sleep(svc.MinimumExpirationWarningInterval + time.Second)
err = stream.Send(consensusRequest)
require.NoError(t, err)
// An alert should be logged now after the timeout expired.
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have received an alert")
}
}