/* Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ package privdata import ( "encoding/asn1" "encoding/hex" "errors" "fmt" "reflect" "testing" "time" pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" proto "github.com/hyperledger/fabric-protos-go/gossip" "github.com/hyperledger/fabric-protos-go/ledger/rwset" "github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset" mspproto "github.com/hyperledger/fabric-protos-go/msp" "github.com/hyperledger/fabric-protos-go/peer" tspb "github.com/hyperledger/fabric-protos-go/transientstore" "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/common/metrics/disabled" util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/metrics" gmetricsmocks "github.com/hyperledger/fabric/gossip/metrics/mocks" privdatacommon "github.com/hyperledger/fabric/gossip/privdata/common" privdatamocks "github.com/hyperledger/fabric/gossip/privdata/mocks" "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/msp" mspmgmt "github.com/hyperledger/fabric/msp/mgmt" msptesttools "github.com/hyperledger/fabric/msp/mgmt/testtools" "github.com/hyperledger/fabric/protoutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) var testConfig = CoordinatorConfig{ PullRetryThreshold: time.Second * 3, TransientBlockRetention: 1000, SkipPullingInvalidTransactions: false, } // CollectionCriteria aggregates criteria of // a collection type CollectionCriteria struct { Channel string Collection string Namespace string } func fromCollectionCriteria(criteria privdata.CollectionCriteria) CollectionCriteria { return CollectionCriteria{ Collection: criteria.Collection, Namespace: criteria.Namespace, Channel: criteria.Channel, } } type validatorMock struct { err error } func (v *validatorMock) Validate(block *common.Block) error { if v.err != nil { return v.err } return nil } type digests []privdatacommon.DigKey func (d digests) Equal(other digests) bool { flatten := func(d digests) map[privdatacommon.DigKey]struct{} { m := map[privdatacommon.DigKey]struct{}{} for _, dig := range d { m[dig] = struct{}{} } return m } return reflect.DeepEqual(flatten(d), flatten(other)) } type fetchCall struct { fetcher *fetcherMock *mock.Call } func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall { if fc.fetcher.expectedEndorsers == nil { fc.fetcher.expectedEndorsers = make(map[string]struct{}) } for _, org := range orgs { sID := &mspproto.SerializedIdentity{Mspid: org, IdBytes: []byte(fmt.Sprintf("p0%s", org))} b, _ := pb.Marshal(sID) fc.fetcher.expectedEndorsers[string(b)] = struct{}{} } return fc } func (fc *fetchCall) expectingDigests(digests []privdatacommon.DigKey) *fetchCall { fc.fetcher.expectedDigests = digests return fc } func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call { return fc.Call.Return(returnArguments...) } type fetcherMock struct { t *testing.T mock.Mock expectedDigests []privdatacommon.DigKey expectedEndorsers map[string]struct{} } func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall { return &fetchCall{ fetcher: f, Call: f.Mock.On(methodName, arguments...), } } func (f *fetcherMock) fetch(dig2src dig2sources) (*privdatacommon.FetchedPvtDataContainer, error) { uniqueEndorsements := make(map[string]interface{}) for _, endorsements := range dig2src { for _, endorsement := range endorsements { _, exists := f.expectedEndorsers[string(endorsement.Endorser)] if !exists { f.t.Fatalf("Encountered a non-expected endorser: %s", string(endorsement.Endorser)) } uniqueEndorsements[string(endorsement.Endorser)] = struct{}{} } } require.True(f.t, digests(f.expectedDigests).Equal(digests(dig2src.keys()))) require.Equal(f.t, len(f.expectedEndorsers), len(uniqueEndorsements)) args := f.Called(dig2src) if args.Get(1) == nil { return args.Get(0).(*privdatacommon.FetchedPvtDataContainer), nil } return nil, args.Get(1).(error) } type testTransientStore struct { storeProvider transientstore.StoreProvider store *transientstore.Store tempdir string } func newTransientStore(t *testing.T) *testTransientStore { s := &testTransientStore{} var err error s.tempdir = t.TempDir() s.storeProvider, err = transientstore.NewStoreProvider(s.tempdir) if err != nil { t.Fatalf("Failed to open store, got err %s", err) return s } s.store, err = s.storeProvider.OpenStore("testchannelid") if err != nil { t.Fatalf("Failed to open store, got err %s", err) return s } return s } func (s *testTransientStore) tearDown() { s.storeProvider.Close() } func (s *testTransientStore) Persist(txid string, blockHeight uint64, privateSimulationResultsWithConfig *tspb.TxPvtReadWriteSetWithConfigInfo) error { return s.store.Persist(txid, blockHeight, privateSimulationResultsWithConfig) } func (s *testTransientStore) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error) { return s.store.GetTxPvtRWSetByTxid(txid, filter) } func createcollectionStore(expectedSignedData protoutil.SignedData) *collectionStore { return &collectionStore{ expectedSignedData: expectedSignedData, policies: make(map[collectionAccessPolicy]CollectionCriteria), store: make(map[CollectionCriteria]collectionAccessPolicy), } } type collectionStore struct { expectedSignedData protoutil.SignedData acceptsAll bool acceptsNone bool lenient bool mspIdentifier string store map[CollectionCriteria]collectionAccessPolicy policies map[collectionAccessPolicy]CollectionCriteria } func (cs *collectionStore) thatAcceptsAll() *collectionStore { cs.acceptsAll = true return cs } func (cs *collectionStore) thatAcceptsNone() *collectionStore { cs.acceptsNone = true return cs } func (cs *collectionStore) thatAccepts(cc CollectionCriteria) *collectionStore { sp := collectionAccessPolicy{ cs: cs, n: util.RandomUInt64(), } cs.store[cc] = sp cs.policies[sp] = cc return cs } func (cs *collectionStore) withMSPIdentity(identifier string) *collectionStore { cs.mspIdentifier = identifier return cs } func (cs *collectionStore) RetrieveCollectionAccessPolicy(cc privdata.CollectionCriteria) (privdata.CollectionAccessPolicy, error) { if sp, exists := cs.store[fromCollectionCriteria(cc)]; exists { return &sp, nil } if cs.acceptsAll || cs.acceptsNone || cs.lenient { return &collectionAccessPolicy{ cs: cs, n: util.RandomUInt64(), }, nil } return nil, privdata.NoSuchCollectionError{} } func (cs *collectionStore) RetrieveCollection(privdata.CollectionCriteria) (privdata.Collection, error) { panic("implement me") } func (cs *collectionStore) RetrieveCollectionConfig(cc privdata.CollectionCriteria) (*peer.StaticCollectionConfig, error) { mspIdentifier := "different-org" if _, exists := cs.store[fromCollectionCriteria(cc)]; exists || cs.acceptsAll { mspIdentifier = cs.mspIdentifier } return &peer.StaticCollectionConfig{ Name: cc.Collection, MemberOnlyRead: true, MemberOrgsPolicy: &peer.CollectionPolicyConfig{ Payload: &peer.CollectionPolicyConfig_SignaturePolicy{ SignaturePolicy: &common.SignaturePolicyEnvelope{ Rule: &common.SignaturePolicy{ Type: &common.SignaturePolicy_SignedBy{ SignedBy: 0, }, }, Identities: []*mspproto.MSPPrincipal{ { PrincipalClassification: mspproto.MSPPrincipal_ROLE, Principal: protoutil.MarshalOrPanic(&mspproto.MSPRole{ MspIdentifier: mspIdentifier, Role: mspproto.MSPRole_MEMBER, }), }, }, }, }, }, }, nil } func (cs *collectionStore) RetrieveReadWritePermission(cc privdata.CollectionCriteria, sp *peer.SignedProposal, qe ledger.QueryExecutor) (bool, bool, error) { panic("implement me") } func (cs *collectionStore) RetrieveCollectionConfigPackage(cc privdata.CollectionCriteria) (*peer.CollectionConfigPackage, error) { return &peer.CollectionConfigPackage{ Config: []*peer.CollectionConfig{ { Payload: &peer.CollectionConfig_StaticCollectionConfig{ StaticCollectionConfig: &peer.StaticCollectionConfig{ Name: cc.Collection, MaximumPeerCount: 1, RequiredPeerCount: 1, }, }, }, }, }, nil } func (cs *collectionStore) RetrieveCollectionPersistenceConfigs(cc privdata.CollectionCriteria) (privdata.CollectionPersistenceConfigs, error) { panic("implement me") } func (cs *collectionStore) AccessFilter(channelName string, collectionPolicyConfig *peer.CollectionPolicyConfig) (privdata.Filter, error) { panic("implement me") } type collectionAccessPolicy struct { cs *collectionStore n uint64 } func (cap *collectionAccessPolicy) MemberOrgs() map[string]struct{} { return map[string]struct{}{ "org0": {}, "org1": {}, } } func (cap *collectionAccessPolicy) RequiredPeerCount() int { return 1 } func (cap *collectionAccessPolicy) MaximumPeerCount() int { return 2 } func (cap *collectionAccessPolicy) IsMemberOnlyRead() bool { return false } func (cap *collectionAccessPolicy) IsMemberOnlyWrite() bool { return false } func (cap *collectionAccessPolicy) AccessFilter() privdata.Filter { return func(sd protoutil.SignedData) bool { that, _ := asn1.Marshal(sd) this, _ := asn1.Marshal(cap.cs.expectedSignedData) if hex.EncodeToString(that) != hex.EncodeToString(this) { panic(fmt.Errorf("self signed data passed isn't equal to expected:%v, %v", sd, cap.cs.expectedSignedData)) } if cap.cs.acceptsNone { return false } else if cap.cs.acceptsAll { return true } _, exists := cap.cs.policies[*cap] return exists } } func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) { collection := &util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "secretCollection", Rwset: []byte{1, 2, 3, 4, 5, 6, 7}, }, }, }, }, }, }, nil, } _, err := collection.Marshal() assertion := require.New(t) assertion.Error(err, "Expected to fail since second item has nil payload") assertion.Equal("Mallformed private data payload, rwset index 1 is nil", fmt.Sprintf("%s", err)) } func TestPvtDataCollections_FailMarshalingWriteSet(t *testing.T) { collection := &util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: nil, }, } _, err := collection.Marshal() assertion := require.New(t) assertion.Error(err, "Expected to fail since first item has nil writeset") assertion.Contains(fmt.Sprintf("%s", err), "Could not marshal private rwset index 0") } func TestPvtDataCollections_Marshal(t *testing.T) { collection := &util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "secretCollection", Rwset: []byte{1, 2, 3, 4, 5, 6, 7}, }, }, }, }, }, }, &ledger.TxPvtData{ SeqInBlock: uint64(2), WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "secretCollection", Rwset: []byte{42, 42, 42, 42, 42, 42, 42}, }, }, }, { Namespace: "ns2", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "otherCollection", Rwset: []byte{10, 9, 8, 7, 6, 5, 4, 3, 2, 1}, }, }, }, }, }, }, } bytes, err := collection.Marshal() assertion := require.New(t) assertion.NoError(err) assertion.NotNil(bytes) assertion.Equal(2, len(bytes)) } func TestPvtDataCollections_Unmarshal(t *testing.T) { collection := util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "secretCollection", Rwset: []byte{1, 2, 3, 4, 5, 6, 7}, }, }, }, }, }, }, } bytes, err := collection.Marshal() assertion := require.New(t) assertion.NoError(err) assertion.NotNil(bytes) assertion.Equal(1, len(bytes)) var newCol util.PvtDataCollections err = newCol.Unmarshal(bytes) assertion.NoError(err) assertion.Equal(1, len(newCol)) assertion.Equal(newCol[0].SeqInBlock, collection[0].SeqInBlock) assertion.True(pb.Equal(newCol[0].WriteSet, collection[0].WriteSet)) } type rwsTriplet struct { namespace string collection string rwset string } func flattenTxPvtDataMap(pd ledger.TxPvtDataMap) map[uint64]map[rwsTriplet]struct{} { m := make(map[uint64]map[rwsTriplet]struct{}) for seqInBlock, namespaces := range pd { triplets := make(map[rwsTriplet]struct{}) for _, namespace := range namespaces.WriteSet.NsPvtRwset { for _, col := range namespace.CollectionPvtRwset { triplets[rwsTriplet{ namespace: namespace.Namespace, collection: col.CollectionName, rwset: hex.EncodeToString(col.Rwset), }] = struct{}{} } } m[seqInBlock] = triplets } return m } var expectedCommittedPrivateData1 = map[uint64]*ledger.TxPvtData{ 0: {SeqInBlock: 0, WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "c1", Rwset: []byte("rws-pre-image"), }, { CollectionName: "c2", Rwset: []byte("rws-pre-image"), }, }, }, }, }}, 1: {SeqInBlock: 1, WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns2", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "c1", Rwset: []byte("rws-pre-image"), }, }, }, }, }}, } var expectedCommittedPrivateData2 = map[uint64]*ledger.TxPvtData{ 0: {SeqInBlock: 0, WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns3", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "c3", Rwset: []byte("rws-pre-image"), }, }, }, }, }}, } var expectedCommittedPrivateData3 = map[uint64]*ledger.TxPvtData{} func TestCoordinatorStoreInvalidBlock(t *testing.T) { err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics hash := util2.ComputeSHA256([]byte("rws-pre-image")) committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { t.Fatal("Shouldn't have committed") }).Return(nil) cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier()) store := newTransientStore(t) defer store.tearDown() assertPurged := func(txns ...string) { for _, txn := range txns { iterator, err := store.GetTxPvtRWSetByTxid(txn, nil) if err != nil { t.Fatalf("Failed iterating, got err %s", err) } res, err := iterator.Next() if err != nil { t.Fatalf("Failed iterating, got err %s", err) } require.Nil(t, res) iterator.Close() } } fetcher := &fetcherMock{t: t} pdFactory := &pvtDataFactory{} bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) block := bf.withoutMetadata().create() // Scenario I: Block we got doesn't have any metadata with it pvtData := pdFactory.create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.Error(t, err) require.Contains(t, err.Error(), "Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap") // Scenario II: Validator has an error while validating the block block = bf.create() pvtData = pdFactory.create() coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{fmt.Errorf("failed validating block")}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.Error(t, err) require.Contains(t, err.Error(), "failed validating block") // Scenario III: Block we got contains an inadequate length of Tx filter in the metadata block = bf.withMetadataSize(100).create() pvtData = pdFactory.create() coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.Error(t, err) require.Contains(t, err.Error(), "block data size") require.Contains(t, err.Error(), "is different from Tx filter size") // Scenario IV: The second transaction in the block we got is invalid, and we have no private data for that. // As the StorePvtDataOfInvalidTx is set of false, if the coordinator would try to fetch private data, the // test would fall because we haven't defined the mock operations for the transientstore (or for gossip) // in this test. var commitHappened bool assertCommitHappened := func() { require.True(t, commitHappened) commitHappened = false } digKeys := []privdatacommon.DigKey{ { TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, }, } fetcher = &fetcherMock{t: t} fetcher.On("fetch", mock.Anything).expectingDigests(digKeys).expectingEndorsers(identity.GetMSPIdentifier()).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: nil, }, nil) committer = &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData commitHappened = true // Only the first transaction's private data is passed to the ledger require.Len(t, privateDataPassed2Ledger, 1) require.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock)) // The private data passed to the ledger contains "ns1" and has 2 collections in it require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1) require.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace) require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2) }).Return(nil) block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create() pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider = &privdatamocks.CapabilityProvider{} appCapability = &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(false) coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() // Ensure the 2nd transaction which is invalid and wasn't committed - is still purged. // This is so that if we get a transaction via dissemination from an endorser, we purge it // when its block comes. assertPurged("tx1", "tx2") // Scenario V: The second transaction in the block we got is invalid, and we have no private // data for that in the transient store. As we have set StorePvtDataOfInvalidTx to true and // configured the coordinator to skip pulling pvtData of invalid transactions from other peers, // it should not store the pvtData of invalid transaction in the ledger instead a missing entry. testConfig.SkipPullingInvalidTransactions = true assertCommitHappened = func() { require.True(t, commitHappened) commitHappened = false } committer = &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { blockAndPvtData := args.Get(0).(*ledger.BlockAndPvtData) commitHappened = true // Only the first transaction's private data is passed to the ledger privateDataPassed2Ledger := blockAndPvtData.PvtData require.Len(t, privateDataPassed2Ledger, 1) require.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock)) // The private data passed to the ledger contains "ns1" and has 2 collections in it require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1) require.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace) require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2) missingPrivateDataPassed2Ledger := blockAndPvtData.MissingPvtData require.Len(t, missingPrivateDataPassed2Ledger, 1) require.Len(t, missingPrivateDataPassed2Ledger[1], 1) require.Equal(t, missingPrivateDataPassed2Ledger[1][0].Namespace, "ns2") require.Equal(t, missingPrivateDataPassed2Ledger[1][0].Collection, "c1") require.Equal(t, missingPrivateDataPassed2Ledger[1][0].IsEligible, true) commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create() pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider = &privdatamocks.CapabilityProvider{} appCapability = &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) digKeys = []privdatacommon.DigKey{} fetcher = &fetcherMock{t: t} fetcher.On("fetch", mock.Anything).expectingDigests(digKeys).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: nil, }, nil) coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurged("tx1", "tx2") // Scenario VI: The second transaction in the block we got is invalid. As we have set the // StorePvtDataOfInvalidTx to true and configured the coordinator to pull pvtData of invalid // transactions, it should store the pvtData of invalid transactions in the ledger. testConfig.SkipPullingInvalidTransactions = false committer = &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { blockAndPvtData := args.Get(0).(*ledger.BlockAndPvtData) commitHappened = true // pvtData of both transactions must be present though the second transaction // is invalid. privateDataPassed2Ledger := blockAndPvtData.PvtData require.Len(t, privateDataPassed2Ledger, 2) require.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock)) require.Equal(t, 1, int(privateDataPassed2Ledger[1].SeqInBlock)) // The private data passed to the ledger for tx1 contains "ns1" and has 2 collections in it require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1) require.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace) require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2) // The private data passed to the ledger for tx2 contains "ns2" and has 1 collection in it require.Len(t, privateDataPassed2Ledger[1].WriteSet.NsPvtRwset, 1) require.Equal(t, "ns2", privateDataPassed2Ledger[1].WriteSet.NsPvtRwset[0].Namespace) require.Len(t, privateDataPassed2Ledger[1].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 1) missingPrivateDataPassed2Ledger := blockAndPvtData.MissingPvtData require.Len(t, missingPrivateDataPassed2Ledger, 0) commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) fetcher = &fetcherMock{t: t} fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{ { TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, }, }).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: []*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ SeqInBlock: 1, BlockSeq: 1, Collection: "c1", Namespace: "ns2", TxId: "tx2", }, Payload: [][]byte{[]byte("rws-pre-image")}, }, }, }, nil) block = bf.withInvalidTxns(1).AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2"). AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create() pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurged("tx1", "tx2") // Scenario VII: Block doesn't contain a header block.Header = nil err = coordinator.StoreBlock(block, pvtData) require.Error(t, err) require.Contains(t, err.Error(), "Block header is nil") // Scenario VIII: Block doesn't contain Data block.Data = nil err = coordinator.StoreBlock(block, pvtData) require.Error(t, err) require.Contains(t, err.Error(), "Block data is empty") } func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) { /* Test case, where peer receives new block for commit it has ns1:c1 in transient store, while it has wrong hash, hence it will fetch ns1:c1 from other peers */ err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } expectedPvtData := map[uint64]*ledger.TxPvtData{ 0: {SeqInBlock: 0, WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "c1", Rwset: []byte("rws-original"), }, }, }, }, }}, } cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier()) committer := &privdatamocks.Committer{} store := newTransientStore(t) defer store.tearDown() assertPurged := func(txns ...string) { for _, txn := range txns { iterator, err := store.GetTxPvtRWSetByTxid(txn, nil) if err != nil { t.Fatalf("Failed iterating, got err %s", err) } res, err := iterator.Next() if err != nil { t.Fatalf("Failed iterating, got err %s", err) } require.Nil(t, res) iterator.Close() } } fetcher := &fetcherMock{t: t} var commitHappened bool committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger), flattenTxPvtDataMap(expectedPvtData))) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) hash := util2.ComputeSHA256([]byte("rws-original")) bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{ { TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1, }, }).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: []*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ BlockSeq: 1, Collection: "c1", Namespace: "ns1", TxId: "tx1", }, Payload: [][]byte{[]byte("rws-original")}, }, }, }, nil) coordinator.StoreBlock(block, nil) // Assert blocks was eventually committed require.True(t, commitHappened) // Assert transaction has been purged assertPurged("tx1") } func TestCoordinatorStoreBlock(t *testing.T) { err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } // Green path test, all private data should be obtained successfully cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier()) var commitHappened bool assertCommitHappened := func() { require.True(t, commitHappened) commitHappened = false } committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger), flattenTxPvtDataMap(expectedCommittedPrivateData1))) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) store := newTransientStore(t) defer store.tearDown() assertPurged := func(txns ...string) bool { for _, txn := range txns { iterator, err := store.GetTxPvtRWSetByTxid(txn, nil) if err != nil { iterator.Close() t.Fatalf("Failed iterating, got err %s", err) } res, err := iterator.Next() iterator.Close() if err != nil { t.Fatalf("Failed iterating, got err %s", err) } if res != nil { return false } } return true } fetcher := &fetcherMock{t: t} hash := util2.ComputeSHA256([]byte("rws-pre-image")) pdFactory := &pvtDataFactory{} bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2"). AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create() metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics fmt.Println("Scenario I") // Scenario I: Block we got has sufficient private data alongside it. // If the coordinator tries fetching from the transientstore, or peers it would result in panic, // because we didn't define yet the "On(...)" invocation of the transient store or other peers. pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurgeTxs := func() bool { return assertPurged("tx1", "tx2") } require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond) fmt.Println("Scenario II") // Scenario II: Block we got doesn't have sufficient private data alongside it, // it is missing ns1: c2, but the data exists in the transient store store.Persist("tx1", 1, &tspb.TxPvtReadWriteSetWithConfigInfo{ PvtRwset: &rwset.TxPvtReadWriteSet{ NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "c2", Rwset: []byte("rws-pre-image"), }, }, }, }, }, CollectionConfigs: make(map[string]*peer.CollectionConfigPackage), }) pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1").addRWSet().addNSRWSet("ns2", "c1").create() err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurgeTxs = func() bool { return assertPurged("tx1", "tx2") } require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond) fmt.Println("Scenario III") // Scenario III: Block doesn't have sufficient private data alongside it, // it is missing ns1: c2, and the data exists in the transient store, // but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer. // Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since // the MemberOrgs() call doesn't return org2 but only org0 and org1. fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{ { TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1, }, { TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, }, }).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: []*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ BlockSeq: 1, Collection: "c2", Namespace: "ns1", TxId: "tx1", }, Payload: [][]byte{[]byte("rws-pre-image")}, }, { Digest: &proto.PvtDataDigest{ SeqInBlock: 1, BlockSeq: 1, Collection: "c1", Namespace: "ns2", TxId: "tx2", }, Payload: [][]byte{[]byte("rws-pre-image")}, }, }, }, nil) pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1").create() err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurgeTxs = func() bool { return assertPurged("tx1", "tx2") } require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond) fmt.Println("Scenario IV") // Scenario IV: Block came with more than sufficient private data alongside it, some of it is redundant. pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2", "c3"). addRWSet().addNSRWSet("ns2", "c1", "c3").addRWSet().addNSRWSet("ns1", "c4").create() err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurgeTxs = func() bool { return assertPurged("tx1", "tx2") } require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond) fmt.Println("Scenario V") // Scenario V: Block we got has private data alongside it but coordinator cannot retrieve collection access // policy of collections due to databse unavailability error. // we verify that the error propagates properly. mockCs := &privdatamocks.CollectionStore{} mockCs.On("RetrieveCollectionConfig", mock.Anything).Return(nil, errors.New("test error")) coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: mockCs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, nil) require.Error(t, err) require.Equal(t, "test error", err.Error()) fmt.Println("Scenario VI") // Scenario VI: Block didn't get with any private data alongside it, and the transient store // has some problem. // In this case, we should try to fetch data from peers. block = bf.AddTxn("tx3", "ns3", hash, "c3").create() fetcher = &fetcherMock{t: t} fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{ { TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1, }, }).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: []*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ BlockSeq: 1, Collection: "c3", Namespace: "ns3", TxId: "tx3", }, Payload: [][]byte{[]byte("rws-pre-image")}, }, }, }, nil) committer = &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger), flattenTxPvtDataMap(expectedCommittedPrivateData2))) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, nil) require.NoError(t, err) assertCommitHappened() assertPurgeTxs = func() bool { return assertPurged("tx3") } require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond) fmt.Println("Scenario VII") // Scenario VII: Block contains 2 transactions, and the peer is eligible for only tx3-ns3-c3. // Also, the blocks comes with a private data for tx3-ns3-c3 so that the peer won't have to fetch the // private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible // for from the transient store or from peers - the test would fail because the Mock wasn't initialized. block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create() cs = createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{ Collection: "c3", Namespace: "ns3", Channel: "testchannelid", }).withMSPIdentity(identity.GetMSPIdentifier()) fetcher = &fetcherMock{t: t} committer = &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger), flattenTxPvtDataMap(expectedCommittedPrivateData2))) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) coordinator = NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) pvtData = pdFactory.addRWSet().addNSRWSet("ns3", "c3").create() err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() // In any case, all transactions in the block are purged from the transient store assertPurgeTxs = func() bool { return assertPurged("tx3", "tx1") } require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond) } func TestCoordinatorStoreBlockWhenPvtDataExistInLedger(t *testing.T) { err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } var commitHappened bool assertCommitHappened := func() { require.True(t, commitHappened) commitHappened = false } committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData require.Equal(t, ledger.TxPvtDataMap{}, privateDataPassed2Ledger) commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: true} require.Equal(t, expectedCommitOpts, commitOpts) commitHappened = true }).Return(nil) fetcher := &fetcherMock{t: t} hash := util2.ComputeSHA256([]byte("rws-pre-image")) pdFactory := &pvtDataFactory{} bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2"). AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create() // Scenario: Block we got has been reprocessed and hence the sufficient pvtData is present // in the local pvtdataStore itself. The pvtData would be fetched from the local pvtdataStore. // If the coordinator tries fetching from the transientstore, or peers it would result in panic, // because we didn't define yet the "On(...)" invocation of the transient store or other peers. pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(true, nil) metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: nil, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, nil, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() } func TestProceedWithoutPrivateData(t *testing.T) { // Scenario: we are missing private data (c2 in ns3) and it cannot be obtained from any peer. // Block needs to be committed with missing private data. err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier()) var commitHappened bool assertCommitHappened := func() { require.True(t, commitHappened) commitHappened = false } committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData) privateDataPassed2Ledger := blockAndPrivateData.PvtData require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger), flattenTxPvtDataMap(expectedCommittedPrivateData2))) missingPrivateData := blockAndPrivateData.MissingPvtData expectedMissingPvtData := make(ledger.TxMissingPvtData) expectedMissingPvtData.Add(0, "ns3", "c2", true) require.Equal(t, expectedMissingPvtData, missingPrivateData) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) store := newTransientStore(t) defer store.tearDown() assertPurged := func(txns ...string) { for _, txn := range txns { iterator, err := store.GetTxPvtRWSetByTxid(txn, nil) if err != nil { t.Fatalf("Failed iterating, got err %s", err) } res, err := iterator.Next() if err != nil { t.Fatalf("Failed iterating, got err %s", err) } require.Nil(t, res) iterator.Close() } } fetcher := &fetcherMock{t: t} // Have the peer return in response to the pull, a private data with a non matching hash fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{ { TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1, }, }).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: []*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ BlockSeq: 1, Collection: "c2", Namespace: "ns3", TxId: "tx1", }, Payload: [][]byte{[]byte("wrong pre-image")}, }, }, }, nil) hash := util2.ComputeSHA256([]byte("rws-pre-image")) pdFactory := &pvtDataFactory{} bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics block := bf.AddTxn("tx1", "ns3", hash, "c3", "c2").create() pvtData := pdFactory.addRWSet().addNSRWSet("ns3", "c3").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) assertCommitHappened() assertPurged("tx1") } func TestProceedWithInEligiblePrivateData(t *testing.T) { // Scenario: we are missing private data (c2 in ns3) and it cannot be obtained from any peer. // Block needs to be committed with missing private data. err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } cs := createcollectionStore(peerSelfSignedData).thatAcceptsNone().withMSPIdentity(identity.GetMSPIdentifier()) var commitHappened bool assertCommitHappened := func() { require.True(t, commitHappened) commitHappened = false } committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData) privateDataPassed2Ledger := blockAndPrivateData.PvtData require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger), flattenTxPvtDataMap(expectedCommittedPrivateData3))) missingPrivateData := blockAndPrivateData.MissingPvtData expectedMissingPvtData := make(ledger.TxMissingPvtData) expectedMissingPvtData.Add(0, "ns3", "c2", false) require.Equal(t, expectedMissingPvtData, missingPrivateData) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) hash := util2.ComputeSHA256([]byte("rws-pre-image")) bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) block := bf.AddTxn("tx1", "ns3", hash, "c2").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: nil, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, nil, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, nil) require.NoError(t, err) assertCommitHappened() } func TestCoordinatorGetBlocks(t *testing.T) { metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } store := newTransientStore(t) defer store.tearDown() idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) fetcher := &fetcherMock{t: t} committer := &privdatamocks.Committer{} committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) hash := util2.ComputeSHA256([]byte("rws-pre-image")) bf := &blockFactory{ channelID: "testchannelid", } block := bf.AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create() // Green path - block and private data is returned, but the requester isn't eligible for all the private data, // but only to a subset of it. cs := createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{ Namespace: "ns1", Collection: "c2", Channel: "testchannelid", }).withMSPIdentity(identity.GetMSPIdentifier()) committer.Mock = mock.Mock{} committer.On("GetPvtDataAndBlockByNum", mock.Anything).Return(&ledger.BlockAndPvtData{ Block: block, PvtData: expectedCommittedPrivateData1, }, nil) committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) expectedPrivData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c2").create() block2, returnedPrivateData, err := coordinator.GetPvtDataAndBlockByNum(1, peerSelfSignedData) require.NoError(t, err) require.Equal(t, block, block2) require.Equal(t, expectedPrivData, []*ledger.TxPvtData(returnedPrivateData)) // Bad path - error occurs when trying to retrieve the block and private data committer.Mock = mock.Mock{} committer.On("GetPvtDataAndBlockByNum", mock.Anything).Return(nil, errors.New("uh oh")) block2, returnedPrivateData, err = coordinator.GetPvtDataAndBlockByNum(1, peerSelfSignedData) require.Nil(t, block2) require.Empty(t, returnedPrivateData) require.Error(t, err) } func TestPurgeBelowHeight(t *testing.T) { conf := testConfig conf.TransientBlockRetention = 5 mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{} cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll() committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Return(nil) store := newTransientStore(t) defer store.tearDown() // store 9 data sets initially for i := 0; i < 9; i++ { txID := fmt.Sprintf("tx%d", i+1) store.Persist(txID, uint64(i), &tspb.TxPvtReadWriteSetWithConfigInfo{ PvtRwset: &rwset.TxPvtReadWriteSet{ NsPvtRwset: []*rwset.NsPvtReadWriteSet{ { Namespace: "ns1", CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ { CollectionName: "c1", Rwset: []byte("rws-pre-image"), }, }, }, }, }, CollectionConfigs: make(map[string]*peer.CollectionConfigPackage), }) } assertPurged := func(purged bool) bool { numTx := 9 if purged { numTx = 10 } for i := 1; i <= numTx; i++ { txID := fmt.Sprintf("tx%d", i) iterator, err := store.GetTxPvtRWSetByTxid(txID, nil) if err != nil { iterator.Close() t.Fatalf("Failed iterating, got err %s", err) } res, err := iterator.Next() iterator.Close() if err != nil { t.Fatalf("Failed iterating, got err %s", err) } if (i < 6 || i == numTx) && purged { if res != nil { return false } continue } if res == nil { return false } } return true } fetcher := &fetcherMock{t: t} bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) pdFactory := &pvtDataFactory{} committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, conf, idDeserializerFactory) hash := util2.ComputeSHA256([]byte("rws-pre-image")) block := bf.AddTxn("tx10", "ns1", hash, "c1").create() block.Header.Number = 10 pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1").create() // test no blocks purged yet assertPurgedBlocks := func() bool { return assertPurged(false) } require.Eventually(t, assertPurgedBlocks, 2*time.Second, 100*time.Millisecond) err := coordinator.StoreBlock(block, pvtData) require.NoError(t, err) // test first 6 blocks were purged assertPurgedBlocks = func() bool { return assertPurged(true) } require.Eventually(t, assertPurgedBlocks, 2*time.Second, 100*time.Millisecond) } func TestCoordinatorStorePvtData(t *testing.T) { mspID := "Org1MSP" metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics cs := createcollectionStore(protoutil.SignedData{}).thatAcceptsAll() committer := &privdatamocks.Committer{} store := newTransientStore(t) defer store.tearDown() idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) fetcher := &fetcherMock{t: t} committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, protoutil.SignedData{}, metrics, testConfig, idDeserializerFactory) pvtData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c1").create() // Green path: ledger height can be retrieved from ledger/committer err := coordinator.StorePvtData("tx1", &tspb.TxPvtReadWriteSetWithConfigInfo{ PvtRwset: pvtData[0].WriteSet, CollectionConfigs: make(map[string]*peer.CollectionConfigPackage), }, uint64(5)) require.NoError(t, err) } func TestContainsWrites(t *testing.T) { // Scenario I: Nil HashedRwSet in collection col := &rwsetutil.CollHashedRwSet{ CollectionName: "col1", } require.False(t, containsWrites("tx", "ns", col)) // Scenario II: No writes in collection col.HashedRwSet = &kvrwset.HashedRWSet{} require.False(t, containsWrites("tx", "ns", col)) // Scenario III: Some writes in collection col.HashedRwSet.HashedWrites = append(col.HashedRwSet.HashedWrites, &kvrwset.KVWriteHash{}) require.True(t, containsWrites("tx", "ns", col)) } func TestIgnoreReadOnlyColRWSets(t *testing.T) { // Scenario: The transaction has some ColRWSets that have only reads and no writes, // These should be ignored and not considered as missing private data that needs to be retrieved // from the transient store or other peers. // The gossip and transient store mocks in this test aren't initialized with // actions, so if the coordinator attempts to fetch private data from the // transient store or other peers, the test would fail. // Also - we check that at commit time - the coordinator concluded that // no missing private data was found. err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier()) var commitHappened bool assertCommitHappened := func() { require.True(t, commitHappened) commitHappened = false } committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData) // Ensure there is no private data to commit require.Empty(t, blockAndPrivateData.PvtData) // Ensure there is no missing private data require.Empty(t, blockAndPrivateData.MissingPvtData) commitHappened = true commitOpts := args.Get(1).(*ledger.CommitOptions) expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false} require.Equal(t, expectedCommitOpts, commitOpts) }).Return(nil) store := newTransientStore(t) defer store.tearDown() fetcher := &fetcherMock{t: t} hash := util2.ComputeSHA256([]byte("rws-pre-image")) bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) // The block contains a read only private data transaction block := bf.AddReadOnlyTxn("tx1", "ns3", hash, "c3", "c2").create() committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) // We pass a nil private data slice to indicate no pre-images though the block contains // private data reads. err = coordinator.StoreBlock(block, nil) require.NoError(t, err) assertCommitHappened() } func TestCoordinatorMetrics(t *testing.T) { err := msptesttools.LoadMSPSetupForTesting() require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity() require.NoError(t, err) serializedID, err := identity.Serialize() require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) data := []byte{1, 2, 3} signature, err := identity.Sign(data) require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) mspID := "Org1MSP" peerSelfSignedData := protoutil.SignedData{ Identity: serializedID, Signature: signature, Data: data, } cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier()) committer := &privdatamocks.Committer{} committer.On("CommitLegacy", mock.Anything, mock.Anything).Return(nil) store := newTransientStore(t) defer store.tearDown() hash := util2.ComputeSHA256([]byte("rws-pre-image")) pdFactory := &pvtDataFactory{} bf := &blockFactory{ channelID: "testchannelid", } idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer { return mspmgmt.GetManagerForChain("testchannelid") }) block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2"). AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1"). AddTxnWithEndorsement("tx3", "ns3", hash, "org3", true, "c1").create() pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create() // fetch duration metric only reported when fetching from remote peer fetcher := &fetcherMock{t: t} fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{ { TxId: "tx3", Namespace: "ns3", Collection: "c1", BlockSeq: 1, SeqInBlock: 2, }, }).Return(&privdatacommon.FetchedPvtDataContainer{ AvailableElements: []*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ SeqInBlock: 2, BlockSeq: 1, Collection: "c1", Namespace: "ns3", TxId: "tx3", }, Payload: [][]byte{[]byte("rws-pre-image")}, }, }, }, nil) testMetricProvider := gmetricsmocks.TestUtilConstructMetricProvider() metrics := metrics.NewGossipMetrics(testMetricProvider.FakeProvider).PrivdataMetrics committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil) capabilityProvider := &privdatamocks.CapabilityProvider{} appCapability := &privdatamocks.ApplicationCapabilities{} capabilityProvider.On("Capabilities").Return(appCapability) appCapability.On("StorePvtDataOfInvalidTx").Return(true) coordinator := NewCoordinator(mspID, Support{ ChainID: "testchannelid", CollectionStore: cs, Committer: committer, Fetcher: fetcher, Validator: &validatorMock{}, CapabilityProvider: capabilityProvider, }, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory) err = coordinator.StoreBlock(block, pvtData) require.NoError(t, err) // make sure all coordinator metrics were reported require.Equal(t, []string{"channel", "testchannelid"}, testMetricProvider.FakeValidationDuration.WithArgsForCall(0), ) require.True(t, testMetricProvider.FakeValidationDuration.ObserveArgsForCall(0) > 0) require.Equal(t, []string{"channel", "testchannelid"}, testMetricProvider.FakeListMissingPrivateDataDuration.WithArgsForCall(0), ) require.True(t, testMetricProvider.FakeListMissingPrivateDataDuration.ObserveArgsForCall(0) > 0) require.Equal(t, []string{"channel", "testchannelid"}, testMetricProvider.FakeFetchDuration.WithArgsForCall(0), ) // fetch duration metric only reported when fetching from remote peer require.True(t, testMetricProvider.FakeFetchDuration.ObserveArgsForCall(0) > 0) require.Equal(t, []string{"channel", "testchannelid"}, testMetricProvider.FakeCommitPrivateDataDuration.WithArgsForCall(0), ) require.True(t, testMetricProvider.FakeCommitPrivateDataDuration.ObserveArgsForCall(0) > 0) require.Equal(t, []string{"channel", "testchannelid"}, testMetricProvider.FakePurgeDuration.WithArgsForCall(0), ) purgeDuration := func() bool { return testMetricProvider.FakePurgeDuration.ObserveArgsForCall(0) > 0 } require.Eventually(t, purgeDuration, 2*time.Second, 100*time.Millisecond) }