1082 lines
35 KiB
Go
1082 lines
35 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package deliver_test
|
|
|
|
import (
|
|
"context"
|
|
"crypto/x509"
|
|
"encoding/pem"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/golang/protobuf/ptypes/timestamp"
|
|
cb "github.com/hyperledger/fabric-protos-go/common"
|
|
"github.com/hyperledger/fabric-protos-go/msp"
|
|
ab "github.com/hyperledger/fabric-protos-go/orderer"
|
|
"github.com/hyperledger/fabric/common/crypto/tlsgen"
|
|
"github.com/hyperledger/fabric/common/deliver"
|
|
"github.com/hyperledger/fabric/common/deliver/mock"
|
|
"github.com/hyperledger/fabric/common/ledger/blockledger"
|
|
"github.com/hyperledger/fabric/common/metrics/disabled"
|
|
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
|
|
"github.com/hyperledger/fabric/common/util"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
var (
|
|
seekOldest = &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}},
|
|
}
|
|
|
|
seekNewest = &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}},
|
|
}
|
|
)
|
|
|
|
var _ = Describe("Deliver", func() {
|
|
Describe("NewHandler", func() {
|
|
var fakeChainManager *mock.ChainManager
|
|
var cert *x509.Certificate
|
|
var certBytes []byte
|
|
var serializedIdentity []byte
|
|
BeforeEach(func() {
|
|
fakeChainManager = &mock.ChainManager{}
|
|
|
|
ca, err := tlsgen.NewCA()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
certBytes = ca.CertBytes()
|
|
|
|
der, _ := pem.Decode(ca.CertBytes())
|
|
cert, err = x509.ParseCertificate(der.Bytes)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
serializedIdentity = protoutil.MarshalOrPanic(&msp.SerializedIdentity{IdBytes: certBytes})
|
|
})
|
|
|
|
It("returns a new handler", func() {
|
|
handler := deliver.NewHandler(
|
|
fakeChainManager,
|
|
time.Second,
|
|
false,
|
|
deliver.NewMetrics(&disabled.Provider{}),
|
|
false)
|
|
Expect(handler).NotTo(BeNil())
|
|
|
|
Expect(handler.ChainManager).To(Equal(fakeChainManager))
|
|
Expect(handler.TimeWindow).To(Equal(time.Second))
|
|
// binding inspector is func; unable to easily validate
|
|
Expect(handler.BindingInspector).NotTo(BeNil())
|
|
})
|
|
|
|
Context("Handler is initialized with expiration checks", func() {
|
|
It("Returns exactly what is found in the certificate", func() {
|
|
handler := deliver.NewHandler(
|
|
fakeChainManager,
|
|
time.Second,
|
|
false,
|
|
deliver.NewMetrics(&disabled.Provider{}),
|
|
false)
|
|
|
|
Expect(handler.ExpirationCheckFunc(serializedIdentity)).To(Equal(cert.NotAfter))
|
|
})
|
|
})
|
|
|
|
Context("Handler is initialized without expiration checks", func() {
|
|
It("Doesn't parse the NotAfter time of the certificate", func() {
|
|
handler := deliver.NewHandler(
|
|
fakeChainManager,
|
|
time.Second,
|
|
false,
|
|
deliver.NewMetrics(&disabled.Provider{}),
|
|
true)
|
|
|
|
Expect(handler.ExpirationCheckFunc(serializedIdentity)).NotTo(Equal(cert.NotAfter))
|
|
})
|
|
})
|
|
})
|
|
|
|
Describe("ExtractChannelHeaderCertHash", func() {
|
|
It("extracts the TLS certificate hash from a channel header", func() {
|
|
chdr := &cb.ChannelHeader{TlsCertHash: []byte("tls-cert")}
|
|
|
|
result := deliver.ExtractChannelHeaderCertHash(chdr)
|
|
Expect(result).To(Equal([]byte("tls-cert")))
|
|
})
|
|
|
|
Context("when the message is not a channel header", func() {
|
|
It("returns nil", func() {
|
|
result := deliver.ExtractChannelHeaderCertHash(&cb.Envelope{})
|
|
Expect(result).To(BeNil())
|
|
})
|
|
})
|
|
|
|
Context("when the message is nil", func() {
|
|
It("returns nil", func() {
|
|
var ch *cb.ChannelHeader
|
|
result := deliver.ExtractChannelHeaderCertHash(ch)
|
|
Expect(result).To(BeNil())
|
|
})
|
|
})
|
|
})
|
|
|
|
Describe("Handle", func() {
|
|
var (
|
|
errCh chan struct{}
|
|
fakeChain *mock.Chain
|
|
fakeBlockReader *mock.BlockReader
|
|
fakeBlockIterator *mock.BlockIterator
|
|
fakeChainManager *mock.ChainManager
|
|
fakePolicyChecker *mock.PolicyChecker
|
|
fakeReceiver *mock.Receiver
|
|
fakeResponseSender *mock.ResponseSender
|
|
fakeInspector *mock.Inspector
|
|
fakeStreamsOpened *metricsfakes.Counter
|
|
fakeStreamsClosed *metricsfakes.Counter
|
|
fakeRequestsReceived *metricsfakes.Counter
|
|
fakeRequestsCompleted *metricsfakes.Counter
|
|
fakeBlocksSent *metricsfakes.Counter
|
|
|
|
handler *deliver.Handler
|
|
server *deliver.Server
|
|
|
|
channelHeader *cb.ChannelHeader
|
|
seekInfo *ab.SeekInfo
|
|
ts *timestamp.Timestamp
|
|
|
|
channelHeaderPayload []byte
|
|
seekInfoPayload []byte
|
|
envelope *cb.Envelope
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
errCh = make(chan struct{})
|
|
fakeChain = &mock.Chain{}
|
|
fakeChain.ErroredReturns(errCh)
|
|
|
|
block := &cb.Block{
|
|
Header: &cb.BlockHeader{Number: 100},
|
|
}
|
|
fakeBlockIterator = &mock.BlockIterator{}
|
|
fakeBlockIterator.NextReturns(block, cb.Status_SUCCESS)
|
|
|
|
fakeBlockReader = &mock.BlockReader{}
|
|
fakeBlockReader.HeightReturns(1000)
|
|
fakeBlockReader.IteratorReturns(fakeBlockIterator, 100)
|
|
fakeChain.ReaderReturns(fakeBlockReader)
|
|
|
|
fakeChainManager = &mock.ChainManager{}
|
|
fakeChainManager.GetChainReturns(fakeChain)
|
|
|
|
fakePolicyChecker = &mock.PolicyChecker{}
|
|
fakeReceiver = &mock.Receiver{}
|
|
fakeResponseSender = &mock.ResponseSender{}
|
|
fakeResponseSender.DataTypeReturns("block")
|
|
|
|
fakeInspector = &mock.Inspector{}
|
|
|
|
fakeStreamsOpened = &metricsfakes.Counter{}
|
|
fakeStreamsOpened.WithReturns(fakeStreamsOpened)
|
|
fakeStreamsClosed = &metricsfakes.Counter{}
|
|
fakeStreamsClosed.WithReturns(fakeStreamsClosed)
|
|
fakeRequestsReceived = &metricsfakes.Counter{}
|
|
fakeRequestsReceived.WithReturns(fakeRequestsReceived)
|
|
fakeRequestsCompleted = &metricsfakes.Counter{}
|
|
fakeRequestsCompleted.WithReturns(fakeRequestsCompleted)
|
|
fakeBlocksSent = &metricsfakes.Counter{}
|
|
fakeBlocksSent.WithReturns(fakeBlocksSent)
|
|
|
|
deliverMetrics := &deliver.Metrics{
|
|
StreamsOpened: fakeStreamsOpened,
|
|
StreamsClosed: fakeStreamsClosed,
|
|
RequestsReceived: fakeRequestsReceived,
|
|
RequestsCompleted: fakeRequestsCompleted,
|
|
BlocksSent: fakeBlocksSent,
|
|
}
|
|
|
|
handler = &deliver.Handler{
|
|
ChainManager: fakeChainManager,
|
|
TimeWindow: time.Second,
|
|
BindingInspector: fakeInspector,
|
|
Metrics: deliverMetrics,
|
|
ExpirationCheckFunc: func([]byte) time.Time {
|
|
return time.Time{}
|
|
},
|
|
}
|
|
server = &deliver.Server{
|
|
Receiver: fakeReceiver,
|
|
PolicyChecker: fakePolicyChecker,
|
|
ResponseSender: fakeResponseSender,
|
|
}
|
|
|
|
ts = util.CreateUtcTimestamp()
|
|
channelHeader = &cb.ChannelHeader{
|
|
ChannelId: "chain-id",
|
|
Timestamp: ts,
|
|
}
|
|
seekInfo = &ab.SeekInfo{
|
|
Start: &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Specified{
|
|
Specified: &ab.SeekSpecified{Number: 100},
|
|
},
|
|
},
|
|
Stop: &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Specified{
|
|
Specified: &ab.SeekSpecified{Number: 100},
|
|
},
|
|
},
|
|
}
|
|
|
|
channelHeaderPayload = nil
|
|
seekInfoPayload = nil
|
|
|
|
envelope = &cb.Envelope{}
|
|
fakeReceiver.RecvReturns(envelope, nil)
|
|
fakeReceiver.RecvReturnsOnCall(1, nil, io.EOF)
|
|
})
|
|
|
|
JustBeforeEach(func() {
|
|
if channelHeaderPayload == nil {
|
|
channelHeaderPayload = protoutil.MarshalOrPanic(channelHeader)
|
|
}
|
|
if seekInfoPayload == nil {
|
|
seekInfoPayload = protoutil.MarshalOrPanic(seekInfo)
|
|
}
|
|
if envelope.Payload == nil {
|
|
payload := &cb.Payload{
|
|
Header: &cb.Header{
|
|
ChannelHeader: channelHeaderPayload,
|
|
SignatureHeader: protoutil.MarshalOrPanic(&cb.SignatureHeader{}),
|
|
},
|
|
Data: seekInfoPayload,
|
|
}
|
|
envelope.Payload = protoutil.MarshalOrPanic(payload)
|
|
}
|
|
})
|
|
|
|
It("records streams opened before streams closed", func() {
|
|
fakeStreamsOpened.AddStub = func(delta float64) {
|
|
defer GinkgoRecover()
|
|
Expect(fakeStreamsClosed.AddCallCount()).To(Equal(0))
|
|
}
|
|
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeStreamsOpened.AddCallCount()).To(Equal(1))
|
|
Expect(fakeStreamsOpened.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
})
|
|
|
|
It("records streams closed after streams opened", func() {
|
|
fakeStreamsClosed.AddStub = func(delta float64) {
|
|
defer GinkgoRecover()
|
|
Expect(fakeStreamsOpened.AddCallCount()).To(Equal(1))
|
|
}
|
|
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeStreamsClosed.AddCallCount()).To(Equal(1))
|
|
Expect(fakeStreamsClosed.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
})
|
|
|
|
It("validates the channel header with the binding inspector", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeInspector.InspectCallCount()).To(Equal(1))
|
|
ctx, header := fakeInspector.InspectArgsForCall(0)
|
|
Expect(ctx).To(Equal(context.Background()))
|
|
Expect(proto.Equal(header, channelHeader)).To(BeTrue())
|
|
})
|
|
|
|
Context("when channel header validation fails", func() {
|
|
BeforeEach(func() {
|
|
fakeInspector.InspectReturns(errors.New("bad-header-thingy"))
|
|
})
|
|
|
|
It("sends a bad request message", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
It("gets the chain from the chain manager", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeChainManager.GetChainCallCount()).To(Equal(1))
|
|
chid := fakeChainManager.GetChainArgsForCall(0)
|
|
Expect(chid).To(Equal("chain-id"))
|
|
})
|
|
|
|
It("receives messages until io.EOF is returned", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeReceiver.RecvCallCount()).To(Equal(2))
|
|
})
|
|
|
|
It("evaluates access control", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakePolicyChecker.CheckPolicyCallCount()).To(BeNumerically(">=", 1))
|
|
e, cid := fakePolicyChecker.CheckPolicyArgsForCall(0)
|
|
Expect(proto.Equal(e, envelope)).To(BeTrue())
|
|
Expect(cid).To(Equal("chain-id"))
|
|
})
|
|
|
|
It("gets a block iterator from the starting block", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeBlockReader.IteratorCallCount()).To(Equal(1))
|
|
startPosition := fakeBlockReader.IteratorArgsForCall(0)
|
|
Expect(proto.Equal(startPosition, seekInfo.Start)).To(BeTrue())
|
|
})
|
|
|
|
Context("when multiple blocks are requested", func() {
|
|
BeforeEach(func() {
|
|
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
|
|
blk := &cb.Block{
|
|
Header: &cb.BlockHeader{Number: 994 + uint64(fakeBlockIterator.NextCallCount())},
|
|
}
|
|
return blk, cb.Status_SUCCESS
|
|
}
|
|
seekInfo = &ab.SeekInfo{
|
|
Start: &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 995}},
|
|
},
|
|
Stop: seekNewest,
|
|
}
|
|
})
|
|
|
|
It("sends all requested blocks", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(5))
|
|
for i := 0; i < 5; i++ {
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(i)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: 995 + uint64(i)},
|
|
}))
|
|
}
|
|
})
|
|
|
|
It("records requests received, blocks sent, and requests completed", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeRequestsReceived.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsReceived.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsReceived.WithCallCount()).To(Equal(1))
|
|
labelValues := fakeRequestsReceived.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block",
|
|
}))
|
|
|
|
Expect(fakeBlocksSent.AddCallCount()).To(Equal(5))
|
|
Expect(fakeBlocksSent.WithCallCount()).To(Equal(5))
|
|
for i := 0; i < 5; i++ {
|
|
Expect(fakeBlocksSent.AddArgsForCall(i)).To(BeNumerically("~", 1.0))
|
|
labelValues := fakeBlocksSent.WithArgsForCall(i)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block",
|
|
}))
|
|
}
|
|
|
|
Expect(fakeRequestsCompleted.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsCompleted.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsCompleted.WithCallCount()).To(Equal(1))
|
|
labelValues = fakeRequestsCompleted.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block",
|
|
"success", "true",
|
|
}))
|
|
})
|
|
})
|
|
|
|
Context("when seek info is configured to stop at the oldest block", func() {
|
|
BeforeEach(func() {
|
|
seekInfo = &ab.SeekInfo{Start: &ab.SeekPosition{}, Stop: seekOldest}
|
|
})
|
|
|
|
It("sends only the first block returned by the iterator", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeBlockReader.IteratorCallCount()).To(Equal(1))
|
|
start := fakeBlockReader.IteratorArgsForCall(0)
|
|
Expect(start).To(Equal(&ab.SeekPosition{}))
|
|
Expect(fakeBlockIterator.NextCallCount()).To(Equal(1))
|
|
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(1))
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(0)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: 100},
|
|
}))
|
|
})
|
|
})
|
|
|
|
Context("when seek info is configured to stop at the newest block", func() {
|
|
BeforeEach(func() {
|
|
seekInfo = &ab.SeekInfo{Start: &ab.SeekPosition{}, Stop: seekNewest}
|
|
|
|
fakeBlockReader.HeightReturns(3)
|
|
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
|
|
blk := &cb.Block{
|
|
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount())},
|
|
}
|
|
return blk, cb.Status_SUCCESS
|
|
}
|
|
})
|
|
|
|
It("sends blocks until the iterator reaches the reader height", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeBlockReader.IteratorCallCount()).To(Equal(1))
|
|
start := fakeBlockReader.IteratorArgsForCall(0)
|
|
Expect(start).To(Equal(&ab.SeekPosition{}))
|
|
|
|
Expect(fakeBlockIterator.NextCallCount()).To(Equal(2))
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(2))
|
|
for i := 0; i < fakeResponseSender.SendBlockResponseCallCount(); i++ {
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(i)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: uint64(i + 1)},
|
|
}))
|
|
}
|
|
})
|
|
})
|
|
|
|
Context("when seek info is configured to send just the newest block and a new block is committed to the ledger after the iterator is acquired", func() {
|
|
BeforeEach(func() {
|
|
seekInfo = &ab.SeekInfo{Start: seekNewest, Stop: seekNewest}
|
|
|
|
fakeBlockReader.IteratorReturns(fakeBlockIterator, 0)
|
|
fakeBlockReader.HeightReturns(2)
|
|
fakeChain.ReaderReturns(fakeBlockReader)
|
|
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
|
|
blk := &cb.Block{
|
|
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount() - 1)},
|
|
}
|
|
return blk, cb.Status_SUCCESS
|
|
}
|
|
})
|
|
|
|
It("sends only the newest block at the time the iterator was acquired", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeBlockReader.IteratorCallCount()).To(Equal(1))
|
|
Expect(fakeBlockIterator.NextCallCount()).To(Equal(1))
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(1))
|
|
for i := 0; i < fakeResponseSender.SendBlockResponseCallCount(); i++ {
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(i)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: uint64(i)},
|
|
}))
|
|
}
|
|
})
|
|
})
|
|
|
|
Context("when seek info is configured to header with sig content type", func() {
|
|
BeforeEach(func() {
|
|
seekInfo = &ab.SeekInfo{
|
|
Start: &ab.SeekPosition{},
|
|
Stop: seekNewest,
|
|
ContentType: ab.SeekInfo_HEADER_WITH_SIG,
|
|
}
|
|
|
|
fakeBlockReader.HeightReturns(3)
|
|
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
|
|
blk := &cb.Block{
|
|
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount())},
|
|
Data: &cb.BlockData{Data: [][]byte{{1}, {2}}},
|
|
Metadata: &cb.BlockMetadata{Metadata: [][]byte{{3}, {4}}},
|
|
}
|
|
return blk, cb.Status_SUCCESS
|
|
}
|
|
})
|
|
|
|
It("sends blocks with nil Data", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeBlockReader.IteratorCallCount()).To(Equal(1))
|
|
start := fakeBlockReader.IteratorArgsForCall(0)
|
|
Expect(start).To(Equal(&ab.SeekPosition{}))
|
|
|
|
Expect(fakeBlockIterator.NextCallCount()).To(Equal(2))
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(2))
|
|
for i := 0; i < fakeResponseSender.SendBlockResponseCallCount(); i++ {
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(i)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: uint64(i + 1)},
|
|
Data: nil,
|
|
Metadata: &cb.BlockMetadata{Metadata: [][]byte{{3}, {4}}},
|
|
}))
|
|
}
|
|
})
|
|
})
|
|
|
|
Context("when filtered blocks are requested", func() {
|
|
var fakeResponseSender *mock.FilteredResponseSender
|
|
|
|
BeforeEach(func() {
|
|
fakeResponseSender = &mock.FilteredResponseSender{}
|
|
fakeResponseSender.IsFilteredReturns(true)
|
|
fakeResponseSender.DataTypeReturns("filtered_block")
|
|
server.ResponseSender = fakeResponseSender
|
|
})
|
|
|
|
It("checks if the response sender is filtered", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.IsFilteredCallCount()).To(Equal(1))
|
|
})
|
|
|
|
Context("when the response sender indicates it is not filtered", func() {
|
|
BeforeEach(func() {
|
|
fakeResponseSender.IsFilteredReturns(false)
|
|
})
|
|
|
|
It("labels the metric with filtered=false", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeRequestsReceived.WithCallCount()).To(Equal(1))
|
|
labelValues := fakeRequestsReceived.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "filtered_block",
|
|
}))
|
|
})
|
|
})
|
|
|
|
It("records requests received, blocks sent, and requests completed with the filtered label set to true", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeRequestsReceived.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsReceived.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsReceived.WithCallCount()).To(Equal(1))
|
|
labelValues := fakeRequestsReceived.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "true",
|
|
"data_type", "filtered_block",
|
|
}))
|
|
|
|
Expect(fakeBlocksSent.AddCallCount()).To(Equal(1))
|
|
Expect(fakeBlocksSent.WithCallCount()).To(Equal(1))
|
|
Expect(fakeBlocksSent.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
labelValues = fakeBlocksSent.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "true",
|
|
"data_type", "filtered_block",
|
|
}))
|
|
|
|
Expect(fakeRequestsCompleted.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsCompleted.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsCompleted.WithCallCount()).To(Equal(1))
|
|
labelValues = fakeRequestsCompleted.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "true",
|
|
"data_type", "filtered_block",
|
|
"success", "true",
|
|
}))
|
|
})
|
|
})
|
|
|
|
Context("when blocks with private data are requested", func() {
|
|
var fakeResponseSender *mock.PrivateDataResponseSender
|
|
|
|
BeforeEach(func() {
|
|
fakeResponseSender = &mock.PrivateDataResponseSender{}
|
|
fakeResponseSender.DataTypeReturns("block_and_pvtdata")
|
|
server.ResponseSender = fakeResponseSender
|
|
})
|
|
|
|
It("handles the request and returns private data for all collections", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(fakeResponseSender.DataTypeCallCount()).To(Equal(1))
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(1))
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(0)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: 100},
|
|
}))
|
|
})
|
|
|
|
It("records requests received, blocks sent, and requests completed with the privatedata label set to true", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeRequestsReceived.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsReceived.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsReceived.WithCallCount()).To(Equal(1))
|
|
labelValues := fakeRequestsReceived.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block_and_pvtdata",
|
|
}))
|
|
|
|
Expect(fakeBlocksSent.AddCallCount()).To(Equal(1))
|
|
Expect(fakeBlocksSent.WithCallCount()).To(Equal(1))
|
|
Expect(fakeBlocksSent.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
labelValues = fakeBlocksSent.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block_and_pvtdata",
|
|
}))
|
|
|
|
Expect(fakeRequestsCompleted.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsCompleted.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsCompleted.WithCallCount()).To(Equal(1))
|
|
labelValues = fakeRequestsCompleted.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block_and_pvtdata",
|
|
"success", "true",
|
|
}))
|
|
})
|
|
})
|
|
|
|
Context("when sending the block fails", func() {
|
|
BeforeEach(func() {
|
|
fakeResponseSender.SendBlockResponseReturns(errors.New("send-fails"))
|
|
})
|
|
|
|
It("returns the error", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).To(MatchError("send-fails"))
|
|
})
|
|
})
|
|
|
|
It("sends a success response", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_SUCCESS))
|
|
})
|
|
|
|
It("HandleAttestation sends requested block", func() {
|
|
err := handler.HandleAttestation(context.Background(), server, envelope)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(1))
|
|
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(0)
|
|
Expect(b).To(Equal(&cb.Block{
|
|
Header: &cb.BlockHeader{Number: 100},
|
|
}))
|
|
})
|
|
|
|
Context("when sending the success status fails", func() {
|
|
BeforeEach(func() {
|
|
fakeResponseSender.SendStatusResponseReturns(errors.New("send-success-fails"))
|
|
})
|
|
|
|
It("returns the error", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).To(MatchError("send-success-fails"))
|
|
})
|
|
|
|
It("HandleAttestation returns the error", func() {
|
|
err := handler.HandleAttestation(context.Background(), server, envelope)
|
|
Expect(err).To(MatchError("send-success-fails"))
|
|
})
|
|
})
|
|
|
|
Context("when receive fails", func() {
|
|
BeforeEach(func() {
|
|
fakeReceiver.RecvReturns(nil, errors.New("oh bother"))
|
|
})
|
|
|
|
It("returns the error", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).To(MatchError("oh bother"))
|
|
})
|
|
})
|
|
|
|
Context("when unmarshalling the payload fails", func() {
|
|
BeforeEach(func() {
|
|
envelope.Payload = []byte("completely-bogus-data")
|
|
})
|
|
|
|
It("sends a bad request message", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when the payload header is nil", func() {
|
|
BeforeEach(func() {
|
|
envelope.Payload = protoutil.MarshalOrPanic(&cb.Payload{
|
|
Header: nil,
|
|
})
|
|
})
|
|
|
|
It("sends a bad request message", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
|
|
It("sends a bad envelope to HandleAttestation", func() {
|
|
err := handler.HandleAttestation(context.Background(), server, envelope)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when unmarshalling the channel header fails", func() {
|
|
BeforeEach(func() {
|
|
channelHeaderPayload = []byte("complete-nonsense")
|
|
})
|
|
|
|
It("sends a bad request message", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when the channel header timestamp is nil", func() {
|
|
BeforeEach(func() {
|
|
channelHeaderPayload = protoutil.MarshalOrPanic(&cb.ChannelHeader{
|
|
Timestamp: nil,
|
|
})
|
|
})
|
|
|
|
It("sends a bad request message", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when the channel header timestamp is out of the time window", func() {
|
|
BeforeEach(func() {
|
|
channelHeaderPayload = protoutil.MarshalOrPanic(&cb.ChannelHeader{
|
|
Timestamp: ×tamp.Timestamp{},
|
|
})
|
|
})
|
|
|
|
It("sends status bad request", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when the channel is not found", func() {
|
|
BeforeEach(func() {
|
|
fakeChainManager.GetChainReturns(nil)
|
|
})
|
|
|
|
It("sends status not found", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_NOT_FOUND))
|
|
})
|
|
})
|
|
|
|
Context("when the client disconnects before reading from the chain", func() {
|
|
var (
|
|
ctx context.Context
|
|
cancel func()
|
|
done chan struct{}
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
done = make(chan struct{})
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
cancel()
|
|
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
|
|
<-done
|
|
return nil, cb.Status_BAD_REQUEST
|
|
}
|
|
})
|
|
|
|
AfterEach(func() {
|
|
close(done)
|
|
})
|
|
|
|
It("aborts the deliver stream", func() {
|
|
err := handler.Handle(ctx, server)
|
|
Expect(err).To(MatchError("context finished before block retrieved: context canceled"))
|
|
})
|
|
})
|
|
|
|
Context("when the chain errors before reading from the chain", func() {
|
|
BeforeEach(func() {
|
|
close(errCh)
|
|
})
|
|
|
|
It("sends status service unavailable", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeChain.ReaderCallCount()).To(Equal(0))
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_SERVICE_UNAVAILABLE))
|
|
})
|
|
|
|
Context("when the seek info requests a best effort error response", func() {
|
|
BeforeEach(func() {
|
|
seekInfo.ErrorResponse = ab.SeekInfo_BEST_EFFORT
|
|
})
|
|
|
|
It("replies with the desired blocks", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(1))
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_SUCCESS))
|
|
})
|
|
})
|
|
})
|
|
|
|
Context("when the chain errors while reading from the chain", func() {
|
|
var doneCh chan struct{}
|
|
|
|
BeforeEach(func() {
|
|
doneCh = make(chan struct{})
|
|
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
|
|
<-doneCh
|
|
return &cb.Block{}, cb.Status_INTERNAL_SERVER_ERROR
|
|
}
|
|
fakeChain.ReaderStub = func() blockledger.Reader {
|
|
close(errCh)
|
|
return fakeBlockReader
|
|
}
|
|
})
|
|
|
|
AfterEach(func() {
|
|
close(doneCh)
|
|
})
|
|
|
|
It("sends status service unavailable", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeChain.ReaderCallCount()).To(Equal(1))
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_SERVICE_UNAVAILABLE))
|
|
})
|
|
})
|
|
|
|
Context("when the access evaluation fails", func() {
|
|
BeforeEach(func() {
|
|
fakePolicyChecker.CheckPolicyReturns(errors.New("no-access-for-you"))
|
|
})
|
|
|
|
It("sends status not found", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_FORBIDDEN))
|
|
})
|
|
|
|
It("records requests received, (unsuccessful) requests completed, and (zero) blocks sent", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeRequestsReceived.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsReceived.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsReceived.WithCallCount()).To(Equal(1))
|
|
labelValues := fakeRequestsReceived.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block",
|
|
}))
|
|
|
|
Expect(fakeBlocksSent.AddCallCount()).To(Equal(0))
|
|
Expect(fakeBlocksSent.WithCallCount()).To(Equal(0))
|
|
|
|
Expect(fakeRequestsCompleted.AddCallCount()).To(Equal(1))
|
|
Expect(fakeRequestsCompleted.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
|
|
Expect(fakeRequestsCompleted.WithCallCount()).To(Equal(1))
|
|
labelValues = fakeRequestsCompleted.WithArgsForCall(0)
|
|
Expect(labelValues).To(Equal([]string{
|
|
"channel", "chain-id",
|
|
"filtered", "false",
|
|
"data_type", "block",
|
|
"success", "false",
|
|
}))
|
|
})
|
|
})
|
|
|
|
Context("when the access expires", func() {
|
|
BeforeEach(func() {
|
|
fakeChain.SequenceStub = func() uint64 {
|
|
return uint64(fakeChain.SequenceCallCount())
|
|
}
|
|
fakePolicyChecker.CheckPolicyReturnsOnCall(1, errors.New("no-access-for-you"))
|
|
})
|
|
|
|
It("sends status not found", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_FORBIDDEN))
|
|
|
|
Expect(fakePolicyChecker.CheckPolicyCallCount()).To(Equal(2))
|
|
})
|
|
})
|
|
|
|
Context("when unmarshalling seek info fails", func() {
|
|
BeforeEach(func() {
|
|
seekInfoPayload = []byte("complete-nonsense")
|
|
})
|
|
|
|
It("sends status bad request", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when seek start and stop are nil", func() {
|
|
BeforeEach(func() {
|
|
seekInfo = &ab.SeekInfo{Start: nil, Stop: nil}
|
|
})
|
|
|
|
It("sends status bad request", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when seek info start number is greater than stop number", func() {
|
|
BeforeEach(func() {
|
|
seekInfo = &ab.SeekInfo{
|
|
Start: seekNewest,
|
|
Stop: &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 99}},
|
|
},
|
|
}
|
|
})
|
|
|
|
It("sends status bad request", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
|
|
})
|
|
})
|
|
|
|
Context("when fail if not ready is set and the next block is unavailable", func() {
|
|
BeforeEach(func() {
|
|
fakeBlockReader.HeightReturns(1000)
|
|
fakeBlockReader.IteratorReturns(fakeBlockIterator, 1000)
|
|
|
|
seekInfo = &ab.SeekInfo{
|
|
Behavior: ab.SeekInfo_FAIL_IF_NOT_READY,
|
|
Start: &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1002}},
|
|
},
|
|
Stop: &ab.SeekPosition{
|
|
Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1003}},
|
|
},
|
|
}
|
|
})
|
|
|
|
It("sends status not found", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeBlockIterator.NextCallCount()).To(Equal(0))
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_NOT_FOUND))
|
|
})
|
|
})
|
|
|
|
Context("when next block status does not indicate success", func() {
|
|
BeforeEach(func() {
|
|
fakeBlockIterator.NextReturns(nil, cb.Status_UNKNOWN)
|
|
})
|
|
|
|
It("forwards the status response", func() {
|
|
err := handler.Handle(context.Background(), server)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
|
|
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
|
|
Expect(resp).To(Equal(cb.Status_UNKNOWN))
|
|
})
|
|
})
|
|
})
|
|
})
|