go_study/fabric-main/gossip/privdata/coordinator_test.go

1994 lines
70 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privdata
import (
"encoding/asn1"
"encoding/hex"
"errors"
"fmt"
"reflect"
"testing"
"time"
pb "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
proto "github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
mspproto "github.com/hyperledger/fabric-protos-go/msp"
"github.com/hyperledger/fabric-protos-go/peer"
tspb "github.com/hyperledger/fabric-protos-go/transientstore"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/metrics/disabled"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/metrics"
gmetricsmocks "github.com/hyperledger/fabric/gossip/metrics/mocks"
privdatacommon "github.com/hyperledger/fabric/gossip/privdata/common"
privdatamocks "github.com/hyperledger/fabric/gossip/privdata/mocks"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
msptesttools "github.com/hyperledger/fabric/msp/mgmt/testtools"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
var testConfig = CoordinatorConfig{
PullRetryThreshold: time.Second * 3,
TransientBlockRetention: 1000,
SkipPullingInvalidTransactions: false,
}
// CollectionCriteria aggregates criteria of
// a collection
type CollectionCriteria struct {
Channel string
Collection string
Namespace string
}
func fromCollectionCriteria(criteria privdata.CollectionCriteria) CollectionCriteria {
return CollectionCriteria{
Collection: criteria.Collection,
Namespace: criteria.Namespace,
Channel: criteria.Channel,
}
}
type validatorMock struct {
err error
}
func (v *validatorMock) Validate(block *common.Block) error {
if v.err != nil {
return v.err
}
return nil
}
type digests []privdatacommon.DigKey
func (d digests) Equal(other digests) bool {
flatten := func(d digests) map[privdatacommon.DigKey]struct{} {
m := map[privdatacommon.DigKey]struct{}{}
for _, dig := range d {
m[dig] = struct{}{}
}
return m
}
return reflect.DeepEqual(flatten(d), flatten(other))
}
type fetchCall struct {
fetcher *fetcherMock
*mock.Call
}
func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall {
if fc.fetcher.expectedEndorsers == nil {
fc.fetcher.expectedEndorsers = make(map[string]struct{})
}
for _, org := range orgs {
sID := &mspproto.SerializedIdentity{Mspid: org, IdBytes: []byte(fmt.Sprintf("p0%s", org))}
b, _ := pb.Marshal(sID)
fc.fetcher.expectedEndorsers[string(b)] = struct{}{}
}
return fc
}
func (fc *fetchCall) expectingDigests(digests []privdatacommon.DigKey) *fetchCall {
fc.fetcher.expectedDigests = digests
return fc
}
func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call {
return fc.Call.Return(returnArguments...)
}
type fetcherMock struct {
t *testing.T
mock.Mock
expectedDigests []privdatacommon.DigKey
expectedEndorsers map[string]struct{}
}
func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall {
return &fetchCall{
fetcher: f,
Call: f.Mock.On(methodName, arguments...),
}
}
func (f *fetcherMock) fetch(dig2src dig2sources) (*privdatacommon.FetchedPvtDataContainer, error) {
uniqueEndorsements := make(map[string]interface{})
for _, endorsements := range dig2src {
for _, endorsement := range endorsements {
_, exists := f.expectedEndorsers[string(endorsement.Endorser)]
if !exists {
f.t.Fatalf("Encountered a non-expected endorser: %s", string(endorsement.Endorser))
}
uniqueEndorsements[string(endorsement.Endorser)] = struct{}{}
}
}
require.True(f.t, digests(f.expectedDigests).Equal(digests(dig2src.keys())))
require.Equal(f.t, len(f.expectedEndorsers), len(uniqueEndorsements))
args := f.Called(dig2src)
if args.Get(1) == nil {
return args.Get(0).(*privdatacommon.FetchedPvtDataContainer), nil
}
return nil, args.Get(1).(error)
}
type testTransientStore struct {
storeProvider transientstore.StoreProvider
store *transientstore.Store
tempdir string
}
func newTransientStore(t *testing.T) *testTransientStore {
s := &testTransientStore{}
var err error
s.tempdir = t.TempDir()
s.storeProvider, err = transientstore.NewStoreProvider(s.tempdir)
if err != nil {
t.Fatalf("Failed to open store, got err %s", err)
return s
}
s.store, err = s.storeProvider.OpenStore("testchannelid")
if err != nil {
t.Fatalf("Failed to open store, got err %s", err)
return s
}
return s
}
func (s *testTransientStore) tearDown() {
s.storeProvider.Close()
}
func (s *testTransientStore) Persist(txid string, blockHeight uint64,
privateSimulationResultsWithConfig *tspb.TxPvtReadWriteSetWithConfigInfo) error {
return s.store.Persist(txid, blockHeight, privateSimulationResultsWithConfig)
}
func (s *testTransientStore) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error) {
return s.store.GetTxPvtRWSetByTxid(txid, filter)
}
func createcollectionStore(expectedSignedData protoutil.SignedData) *collectionStore {
return &collectionStore{
expectedSignedData: expectedSignedData,
policies: make(map[collectionAccessPolicy]CollectionCriteria),
store: make(map[CollectionCriteria]collectionAccessPolicy),
}
}
type collectionStore struct {
expectedSignedData protoutil.SignedData
acceptsAll bool
acceptsNone bool
lenient bool
mspIdentifier string
store map[CollectionCriteria]collectionAccessPolicy
policies map[collectionAccessPolicy]CollectionCriteria
}
func (cs *collectionStore) thatAcceptsAll() *collectionStore {
cs.acceptsAll = true
return cs
}
func (cs *collectionStore) thatAcceptsNone() *collectionStore {
cs.acceptsNone = true
return cs
}
func (cs *collectionStore) thatAccepts(cc CollectionCriteria) *collectionStore {
sp := collectionAccessPolicy{
cs: cs,
n: util.RandomUInt64(),
}
cs.store[cc] = sp
cs.policies[sp] = cc
return cs
}
func (cs *collectionStore) withMSPIdentity(identifier string) *collectionStore {
cs.mspIdentifier = identifier
return cs
}
func (cs *collectionStore) RetrieveCollectionAccessPolicy(cc privdata.CollectionCriteria) (privdata.CollectionAccessPolicy, error) {
if sp, exists := cs.store[fromCollectionCriteria(cc)]; exists {
return &sp, nil
}
if cs.acceptsAll || cs.acceptsNone || cs.lenient {
return &collectionAccessPolicy{
cs: cs,
n: util.RandomUInt64(),
}, nil
}
return nil, privdata.NoSuchCollectionError{}
}
func (cs *collectionStore) RetrieveCollection(privdata.CollectionCriteria) (privdata.Collection, error) {
panic("implement me")
}
func (cs *collectionStore) RetrieveCollectionConfig(cc privdata.CollectionCriteria) (*peer.StaticCollectionConfig, error) {
mspIdentifier := "different-org"
if _, exists := cs.store[fromCollectionCriteria(cc)]; exists || cs.acceptsAll {
mspIdentifier = cs.mspIdentifier
}
return &peer.StaticCollectionConfig{
Name: cc.Collection,
MemberOnlyRead: true,
MemberOrgsPolicy: &peer.CollectionPolicyConfig{
Payload: &peer.CollectionPolicyConfig_SignaturePolicy{
SignaturePolicy: &common.SignaturePolicyEnvelope{
Rule: &common.SignaturePolicy{
Type: &common.SignaturePolicy_SignedBy{
SignedBy: 0,
},
},
Identities: []*mspproto.MSPPrincipal{
{
PrincipalClassification: mspproto.MSPPrincipal_ROLE,
Principal: protoutil.MarshalOrPanic(&mspproto.MSPRole{
MspIdentifier: mspIdentifier,
Role: mspproto.MSPRole_MEMBER,
}),
},
},
},
},
},
}, nil
}
func (cs *collectionStore) RetrieveReadWritePermission(cc privdata.CollectionCriteria, sp *peer.SignedProposal, qe ledger.QueryExecutor) (bool, bool, error) {
panic("implement me")
}
func (cs *collectionStore) RetrieveCollectionConfigPackage(cc privdata.CollectionCriteria) (*peer.CollectionConfigPackage, error) {
return &peer.CollectionConfigPackage{
Config: []*peer.CollectionConfig{
{
Payload: &peer.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &peer.StaticCollectionConfig{
Name: cc.Collection,
MaximumPeerCount: 1,
RequiredPeerCount: 1,
},
},
},
},
}, nil
}
func (cs *collectionStore) RetrieveCollectionPersistenceConfigs(cc privdata.CollectionCriteria) (privdata.CollectionPersistenceConfigs, error) {
panic("implement me")
}
func (cs *collectionStore) AccessFilter(channelName string, collectionPolicyConfig *peer.CollectionPolicyConfig) (privdata.Filter, error) {
panic("implement me")
}
type collectionAccessPolicy struct {
cs *collectionStore
n uint64
}
func (cap *collectionAccessPolicy) MemberOrgs() map[string]struct{} {
return map[string]struct{}{
"org0": {},
"org1": {},
}
}
func (cap *collectionAccessPolicy) RequiredPeerCount() int {
return 1
}
func (cap *collectionAccessPolicy) MaximumPeerCount() int {
return 2
}
func (cap *collectionAccessPolicy) IsMemberOnlyRead() bool {
return false
}
func (cap *collectionAccessPolicy) IsMemberOnlyWrite() bool {
return false
}
func (cap *collectionAccessPolicy) AccessFilter() privdata.Filter {
return func(sd protoutil.SignedData) bool {
that, _ := asn1.Marshal(sd)
this, _ := asn1.Marshal(cap.cs.expectedSignedData)
if hex.EncodeToString(that) != hex.EncodeToString(this) {
panic(fmt.Errorf("self signed data passed isn't equal to expected:%v, %v", sd, cap.cs.expectedSignedData))
}
if cap.cs.acceptsNone {
return false
} else if cap.cs.acceptsAll {
return true
}
_, exists := cap.cs.policies[*cap]
return exists
}
}
func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) {
collection := &util.PvtDataCollections{
&ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{1, 2, 3, 4, 5, 6, 7},
},
},
},
},
},
},
nil,
}
_, err := collection.Marshal()
assertion := require.New(t)
assertion.Error(err, "Expected to fail since second item has nil payload")
assertion.Equal("Mallformed private data payload, rwset index 1 is nil", fmt.Sprintf("%s", err))
}
func TestPvtDataCollections_FailMarshalingWriteSet(t *testing.T) {
collection := &util.PvtDataCollections{
&ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: nil,
},
}
_, err := collection.Marshal()
assertion := require.New(t)
assertion.Error(err, "Expected to fail since first item has nil writeset")
assertion.Contains(fmt.Sprintf("%s", err), "Could not marshal private rwset index 0")
}
func TestPvtDataCollections_Marshal(t *testing.T) {
collection := &util.PvtDataCollections{
&ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{1, 2, 3, 4, 5, 6, 7},
},
},
},
},
},
},
&ledger.TxPvtData{
SeqInBlock: uint64(2),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{42, 42, 42, 42, 42, 42, 42},
},
},
},
{
Namespace: "ns2",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "otherCollection",
Rwset: []byte{10, 9, 8, 7, 6, 5, 4, 3, 2, 1},
},
},
},
},
},
},
}
bytes, err := collection.Marshal()
assertion := require.New(t)
assertion.NoError(err)
assertion.NotNil(bytes)
assertion.Equal(2, len(bytes))
}
func TestPvtDataCollections_Unmarshal(t *testing.T) {
collection := util.PvtDataCollections{
&ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{1, 2, 3, 4, 5, 6, 7},
},
},
},
},
},
},
}
bytes, err := collection.Marshal()
assertion := require.New(t)
assertion.NoError(err)
assertion.NotNil(bytes)
assertion.Equal(1, len(bytes))
var newCol util.PvtDataCollections
err = newCol.Unmarshal(bytes)
assertion.NoError(err)
assertion.Equal(1, len(newCol))
assertion.Equal(newCol[0].SeqInBlock, collection[0].SeqInBlock)
assertion.True(pb.Equal(newCol[0].WriteSet, collection[0].WriteSet))
}
type rwsTriplet struct {
namespace string
collection string
rwset string
}
func flattenTxPvtDataMap(pd ledger.TxPvtDataMap) map[uint64]map[rwsTriplet]struct{} {
m := make(map[uint64]map[rwsTriplet]struct{})
for seqInBlock, namespaces := range pd {
triplets := make(map[rwsTriplet]struct{})
for _, namespace := range namespaces.WriteSet.NsPvtRwset {
for _, col := range namespace.CollectionPvtRwset {
triplets[rwsTriplet{
namespace: namespace.Namespace,
collection: col.CollectionName,
rwset: hex.EncodeToString(col.Rwset),
}] = struct{}{}
}
}
m[seqInBlock] = triplets
}
return m
}
var expectedCommittedPrivateData1 = map[uint64]*ledger.TxPvtData{
0: {SeqInBlock: 0, WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "c1",
Rwset: []byte("rws-pre-image"),
},
{
CollectionName: "c2",
Rwset: []byte("rws-pre-image"),
},
},
},
},
}},
1: {SeqInBlock: 1, WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns2",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "c1",
Rwset: []byte("rws-pre-image"),
},
},
},
},
}},
}
var expectedCommittedPrivateData2 = map[uint64]*ledger.TxPvtData{
0: {SeqInBlock: 0, WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns3",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "c3",
Rwset: []byte("rws-pre-image"),
},
},
},
},
}},
}
var expectedCommittedPrivateData3 = map[uint64]*ledger.TxPvtData{}
func TestCoordinatorStoreInvalidBlock(t *testing.T) {
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
t.Fatal("Shouldn't have committed")
}).Return(nil)
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier())
store := newTransientStore(t)
defer store.tearDown()
assertPurged := func(txns ...string) {
for _, txn := range txns {
iterator, err := store.GetTxPvtRWSetByTxid(txn, nil)
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
res, err := iterator.Next()
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
require.Nil(t, res)
iterator.Close()
}
}
fetcher := &fetcherMock{t: t}
pdFactory := &pvtDataFactory{}
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
block := bf.withoutMetadata().create()
// Scenario I: Block we got doesn't have any metadata with it
pvtData := pdFactory.create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.Error(t, err)
require.Contains(t, err.Error(), "Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
// Scenario II: Validator has an error while validating the block
block = bf.create()
pvtData = pdFactory.create()
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{fmt.Errorf("failed validating block")},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.Error(t, err)
require.Contains(t, err.Error(), "failed validating block")
// Scenario III: Block we got contains an inadequate length of Tx filter in the metadata
block = bf.withMetadataSize(100).create()
pvtData = pdFactory.create()
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.Error(t, err)
require.Contains(t, err.Error(), "block data size")
require.Contains(t, err.Error(), "is different from Tx filter size")
// Scenario IV: The second transaction in the block we got is invalid, and we have no private data for that.
// As the StorePvtDataOfInvalidTx is set of false, if the coordinator would try to fetch private data, the
// test would fall because we haven't defined the mock operations for the transientstore (or for gossip)
// in this test.
var commitHappened bool
assertCommitHappened := func() {
require.True(t, commitHappened)
commitHappened = false
}
digKeys := []privdatacommon.DigKey{
{
TxId: "tx2",
Namespace: "ns2",
Collection: "c1",
BlockSeq: 1,
SeqInBlock: 1,
},
}
fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests(digKeys).expectingEndorsers(identity.GetMSPIdentifier()).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: nil,
}, nil)
committer = &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData
commitHappened = true
// Only the first transaction's private data is passed to the ledger
require.Len(t, privateDataPassed2Ledger, 1)
require.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock))
// The private data passed to the ledger contains "ns1" and has 2 collections in it
require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1)
require.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace)
require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2)
}).Return(nil)
block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create()
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider = &privdatamocks.CapabilityProvider{}
appCapability = &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(false)
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
// Ensure the 2nd transaction which is invalid and wasn't committed - is still purged.
// This is so that if we get a transaction via dissemination from an endorser, we purge it
// when its block comes.
assertPurged("tx1", "tx2")
// Scenario V: The second transaction in the block we got is invalid, and we have no private
// data for that in the transient store. As we have set StorePvtDataOfInvalidTx to true and
// configured the coordinator to skip pulling pvtData of invalid transactions from other peers,
// it should not store the pvtData of invalid transaction in the ledger instead a missing entry.
testConfig.SkipPullingInvalidTransactions = true
assertCommitHappened = func() {
require.True(t, commitHappened)
commitHappened = false
}
committer = &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
blockAndPvtData := args.Get(0).(*ledger.BlockAndPvtData)
commitHappened = true
// Only the first transaction's private data is passed to the ledger
privateDataPassed2Ledger := blockAndPvtData.PvtData
require.Len(t, privateDataPassed2Ledger, 1)
require.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock))
// The private data passed to the ledger contains "ns1" and has 2 collections in it
require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1)
require.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace)
require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2)
missingPrivateDataPassed2Ledger := blockAndPvtData.MissingPvtData
require.Len(t, missingPrivateDataPassed2Ledger, 1)
require.Len(t, missingPrivateDataPassed2Ledger[1], 1)
require.Equal(t, missingPrivateDataPassed2Ledger[1][0].Namespace, "ns2")
require.Equal(t, missingPrivateDataPassed2Ledger[1][0].Collection, "c1")
require.Equal(t, missingPrivateDataPassed2Ledger[1][0].IsEligible, true)
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create()
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider = &privdatamocks.CapabilityProvider{}
appCapability = &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
digKeys = []privdatacommon.DigKey{}
fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests(digKeys).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: nil,
}, nil)
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurged("tx1", "tx2")
// Scenario VI: The second transaction in the block we got is invalid. As we have set the
// StorePvtDataOfInvalidTx to true and configured the coordinator to pull pvtData of invalid
// transactions, it should store the pvtData of invalid transactions in the ledger.
testConfig.SkipPullingInvalidTransactions = false
committer = &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
blockAndPvtData := args.Get(0).(*ledger.BlockAndPvtData)
commitHappened = true
// pvtData of both transactions must be present though the second transaction
// is invalid.
privateDataPassed2Ledger := blockAndPvtData.PvtData
require.Len(t, privateDataPassed2Ledger, 2)
require.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock))
require.Equal(t, 1, int(privateDataPassed2Ledger[1].SeqInBlock))
// The private data passed to the ledger for tx1 contains "ns1" and has 2 collections in it
require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1)
require.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace)
require.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2)
// The private data passed to the ledger for tx2 contains "ns2" and has 1 collection in it
require.Len(t, privateDataPassed2Ledger[1].WriteSet.NsPvtRwset, 1)
require.Equal(t, "ns2", privateDataPassed2Ledger[1].WriteSet.NsPvtRwset[0].Namespace)
require.Len(t, privateDataPassed2Ledger[1].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 1)
missingPrivateDataPassed2Ledger := blockAndPvtData.MissingPvtData
require.Len(t, missingPrivateDataPassed2Ledger, 0)
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
{
TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1,
},
}).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: []*proto.PvtDataElement{
{
Digest: &proto.PvtDataDigest{
SeqInBlock: 1,
BlockSeq: 1,
Collection: "c1",
Namespace: "ns2",
TxId: "tx2",
},
Payload: [][]byte{[]byte("rws-pre-image")},
},
},
}, nil)
block = bf.withInvalidTxns(1).AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2").
AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create()
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurged("tx1", "tx2")
// Scenario VII: Block doesn't contain a header
block.Header = nil
err = coordinator.StoreBlock(block, pvtData)
require.Error(t, err)
require.Contains(t, err.Error(), "Block header is nil")
// Scenario VIII: Block doesn't contain Data
block.Data = nil
err = coordinator.StoreBlock(block, pvtData)
require.Error(t, err)
require.Contains(t, err.Error(), "Block data is empty")
}
func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
/*
Test case, where peer receives new block for commit
it has ns1:c1 in transient store, while it has wrong
hash, hence it will fetch ns1:c1 from other peers
*/
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
expectedPvtData := map[uint64]*ledger.TxPvtData{
0: {SeqInBlock: 0, WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "c1",
Rwset: []byte("rws-original"),
},
},
},
},
}},
}
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier())
committer := &privdatamocks.Committer{}
store := newTransientStore(t)
defer store.tearDown()
assertPurged := func(txns ...string) {
for _, txn := range txns {
iterator, err := store.GetTxPvtRWSetByTxid(txn, nil)
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
res, err := iterator.Next()
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
require.Nil(t, res)
iterator.Close()
}
}
fetcher := &fetcherMock{t: t}
var commitHappened bool
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData
require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger),
flattenTxPvtDataMap(expectedPvtData)))
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
hash := util2.ComputeSHA256([]byte("rws-original"))
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
{
TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1,
},
}).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: []*proto.PvtDataElement{
{
Digest: &proto.PvtDataDigest{
BlockSeq: 1,
Collection: "c1",
Namespace: "ns1",
TxId: "tx1",
},
Payload: [][]byte{[]byte("rws-original")},
},
},
}, nil)
coordinator.StoreBlock(block, nil)
// Assert blocks was eventually committed
require.True(t, commitHappened)
// Assert transaction has been purged
assertPurged("tx1")
}
func TestCoordinatorStoreBlock(t *testing.T) {
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
// Green path test, all private data should be obtained successfully
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier())
var commitHappened bool
assertCommitHappened := func() {
require.True(t, commitHappened)
commitHappened = false
}
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData
require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger),
flattenTxPvtDataMap(expectedCommittedPrivateData1)))
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
store := newTransientStore(t)
defer store.tearDown()
assertPurged := func(txns ...string) bool {
for _, txn := range txns {
iterator, err := store.GetTxPvtRWSetByTxid(txn, nil)
if err != nil {
iterator.Close()
t.Fatalf("Failed iterating, got err %s", err)
}
res, err := iterator.Next()
iterator.Close()
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
if res != nil {
return false
}
}
return true
}
fetcher := &fetcherMock{t: t}
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
pdFactory := &pvtDataFactory{}
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2").
AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create()
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
fmt.Println("Scenario I")
// Scenario I: Block we got has sufficient private data alongside it.
// If the coordinator tries fetching from the transientstore, or peers it would result in panic,
// because we didn't define yet the "On(...)" invocation of the transient store or other peers.
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurgeTxs := func() bool {
return assertPurged("tx1", "tx2")
}
require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond)
fmt.Println("Scenario II")
// Scenario II: Block we got doesn't have sufficient private data alongside it,
// it is missing ns1: c2, but the data exists in the transient store
store.Persist("tx1", 1, &tspb.TxPvtReadWriteSetWithConfigInfo{
PvtRwset: &rwset.TxPvtReadWriteSet{
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "c2",
Rwset: []byte("rws-pre-image"),
},
},
},
},
},
CollectionConfigs: make(map[string]*peer.CollectionConfigPackage),
})
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1").addRWSet().addNSRWSet("ns2", "c1").create()
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurgeTxs = func() bool {
return assertPurged("tx1", "tx2")
}
require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond)
fmt.Println("Scenario III")
// Scenario III: Block doesn't have sufficient private data alongside it,
// it is missing ns1: c2, and the data exists in the transient store,
// but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer.
// Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since
// the MemberOrgs() call doesn't return org2 but only org0 and org1.
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
{
TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1,
},
{
TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1,
},
}).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: []*proto.PvtDataElement{
{
Digest: &proto.PvtDataDigest{
BlockSeq: 1,
Collection: "c2",
Namespace: "ns1",
TxId: "tx1",
},
Payload: [][]byte{[]byte("rws-pre-image")},
},
{
Digest: &proto.PvtDataDigest{
SeqInBlock: 1,
BlockSeq: 1,
Collection: "c1",
Namespace: "ns2",
TxId: "tx2",
},
Payload: [][]byte{[]byte("rws-pre-image")},
},
},
}, nil)
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1").create()
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurgeTxs = func() bool {
return assertPurged("tx1", "tx2")
}
require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond)
fmt.Println("Scenario IV")
// Scenario IV: Block came with more than sufficient private data alongside it, some of it is redundant.
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2", "c3").
addRWSet().addNSRWSet("ns2", "c1", "c3").addRWSet().addNSRWSet("ns1", "c4").create()
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurgeTxs = func() bool {
return assertPurged("tx1", "tx2")
}
require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond)
fmt.Println("Scenario V")
// Scenario V: Block we got has private data alongside it but coordinator cannot retrieve collection access
// policy of collections due to databse unavailability error.
// we verify that the error propagates properly.
mockCs := &privdatamocks.CollectionStore{}
mockCs.On("RetrieveCollectionConfig", mock.Anything).Return(nil, errors.New("test error"))
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: mockCs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, nil)
require.Error(t, err)
require.Equal(t, "test error", err.Error())
fmt.Println("Scenario VI")
// Scenario VI: Block didn't get with any private data alongside it, and the transient store
// has some problem.
// In this case, we should try to fetch data from peers.
block = bf.AddTxn("tx3", "ns3", hash, "c3").create()
fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
{
TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1,
},
}).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: []*proto.PvtDataElement{
{
Digest: &proto.PvtDataDigest{
BlockSeq: 1,
Collection: "c3",
Namespace: "ns3",
TxId: "tx3",
},
Payload: [][]byte{[]byte("rws-pre-image")},
},
},
}, nil)
committer = &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData
require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger),
flattenTxPvtDataMap(expectedCommittedPrivateData2)))
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, nil)
require.NoError(t, err)
assertCommitHappened()
assertPurgeTxs = func() bool {
return assertPurged("tx3")
}
require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond)
fmt.Println("Scenario VII")
// Scenario VII: Block contains 2 transactions, and the peer is eligible for only tx3-ns3-c3.
// Also, the blocks comes with a private data for tx3-ns3-c3 so that the peer won't have to fetch the
// private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible
// for from the transient store or from peers - the test would fail because the Mock wasn't initialized.
block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create()
cs = createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{
Collection: "c3",
Namespace: "ns3",
Channel: "testchannelid",
}).withMSPIdentity(identity.GetMSPIdentifier())
fetcher = &fetcherMock{t: t}
committer = &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData
require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger),
flattenTxPvtDataMap(expectedCommittedPrivateData2)))
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
coordinator = NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
pvtData = pdFactory.addRWSet().addNSRWSet("ns3", "c3").create()
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
// In any case, all transactions in the block are purged from the transient store
assertPurgeTxs = func() bool {
return assertPurged("tx3", "tx1")
}
require.Eventually(t, assertPurgeTxs, 2*time.Second, 100*time.Millisecond)
}
func TestCoordinatorStoreBlockWhenPvtDataExistInLedger(t *testing.T) {
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
var commitHappened bool
assertCommitHappened := func() {
require.True(t, commitHappened)
commitHappened = false
}
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
privateDataPassed2Ledger := args.Get(0).(*ledger.BlockAndPvtData).PvtData
require.Equal(t, ledger.TxPvtDataMap{}, privateDataPassed2Ledger)
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: true}
require.Equal(t, expectedCommitOpts, commitOpts)
commitHappened = true
}).Return(nil)
fetcher := &fetcherMock{t: t}
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
pdFactory := &pvtDataFactory{}
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2").
AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create()
// Scenario: Block we got has been reprocessed and hence the sufficient pvtData is present
// in the local pvtdataStore itself. The pvtData would be fetched from the local pvtdataStore.
// If the coordinator tries fetching from the transientstore, or peers it would result in panic,
// because we didn't define yet the "On(...)" invocation of the transient store or other peers.
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(true, nil)
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: nil,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, nil, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
}
func TestProceedWithoutPrivateData(t *testing.T) {
// Scenario: we are missing private data (c2 in ns3) and it cannot be obtained from any peer.
// Block needs to be committed with missing private data.
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier())
var commitHappened bool
assertCommitHappened := func() {
require.True(t, commitHappened)
commitHappened = false
}
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData)
privateDataPassed2Ledger := blockAndPrivateData.PvtData
require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger),
flattenTxPvtDataMap(expectedCommittedPrivateData2)))
missingPrivateData := blockAndPrivateData.MissingPvtData
expectedMissingPvtData := make(ledger.TxMissingPvtData)
expectedMissingPvtData.Add(0, "ns3", "c2", true)
require.Equal(t, expectedMissingPvtData, missingPrivateData)
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
store := newTransientStore(t)
defer store.tearDown()
assertPurged := func(txns ...string) {
for _, txn := range txns {
iterator, err := store.GetTxPvtRWSetByTxid(txn, nil)
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
res, err := iterator.Next()
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
require.Nil(t, res)
iterator.Close()
}
}
fetcher := &fetcherMock{t: t}
// Have the peer return in response to the pull, a private data with a non matching hash
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
{
TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1,
},
}).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: []*proto.PvtDataElement{
{
Digest: &proto.PvtDataDigest{
BlockSeq: 1,
Collection: "c2",
Namespace: "ns3",
TxId: "tx1",
},
Payload: [][]byte{[]byte("wrong pre-image")},
},
},
}, nil)
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
pdFactory := &pvtDataFactory{}
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
block := bf.AddTxn("tx1", "ns3", hash, "c3", "c2").create()
pvtData := pdFactory.addRWSet().addNSRWSet("ns3", "c3").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
assertCommitHappened()
assertPurged("tx1")
}
func TestProceedWithInEligiblePrivateData(t *testing.T) {
// Scenario: we are missing private data (c2 in ns3) and it cannot be obtained from any peer.
// Block needs to be committed with missing private data.
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
cs := createcollectionStore(peerSelfSignedData).thatAcceptsNone().withMSPIdentity(identity.GetMSPIdentifier())
var commitHappened bool
assertCommitHappened := func() {
require.True(t, commitHappened)
commitHappened = false
}
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData)
privateDataPassed2Ledger := blockAndPrivateData.PvtData
require.True(t, reflect.DeepEqual(flattenTxPvtDataMap(privateDataPassed2Ledger),
flattenTxPvtDataMap(expectedCommittedPrivateData3)))
missingPrivateData := blockAndPrivateData.MissingPvtData
expectedMissingPvtData := make(ledger.TxMissingPvtData)
expectedMissingPvtData.Add(0, "ns3", "c2", false)
require.Equal(t, expectedMissingPvtData, missingPrivateData)
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
block := bf.AddTxn("tx1", "ns3", hash, "c2").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: nil,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, nil, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, nil)
require.NoError(t, err)
assertCommitHappened()
}
func TestCoordinatorGetBlocks(t *testing.T) {
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
store := newTransientStore(t)
defer store.tearDown()
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
fetcher := &fetcherMock{t: t}
committer := &privdatamocks.Committer{}
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
bf := &blockFactory{
channelID: "testchannelid",
}
block := bf.AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create()
// Green path - block and private data is returned, but the requester isn't eligible for all the private data,
// but only to a subset of it.
cs := createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{
Namespace: "ns1",
Collection: "c2",
Channel: "testchannelid",
}).withMSPIdentity(identity.GetMSPIdentifier())
committer.Mock = mock.Mock{}
committer.On("GetPvtDataAndBlockByNum", mock.Anything).Return(&ledger.BlockAndPvtData{
Block: block,
PvtData: expectedCommittedPrivateData1,
}, nil)
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
expectedPrivData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c2").create()
block2, returnedPrivateData, err := coordinator.GetPvtDataAndBlockByNum(1, peerSelfSignedData)
require.NoError(t, err)
require.Equal(t, block, block2)
require.Equal(t, expectedPrivData, []*ledger.TxPvtData(returnedPrivateData))
// Bad path - error occurs when trying to retrieve the block and private data
committer.Mock = mock.Mock{}
committer.On("GetPvtDataAndBlockByNum", mock.Anything).Return(nil, errors.New("uh oh"))
block2, returnedPrivateData, err = coordinator.GetPvtDataAndBlockByNum(1, peerSelfSignedData)
require.Nil(t, block2)
require.Empty(t, returnedPrivateData)
require.Error(t, err)
}
func TestPurgeBelowHeight(t *testing.T) {
conf := testConfig
conf.TransientBlockRetention = 5
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{}
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll()
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Return(nil)
store := newTransientStore(t)
defer store.tearDown()
// store 9 data sets initially
for i := 0; i < 9; i++ {
txID := fmt.Sprintf("tx%d", i+1)
store.Persist(txID, uint64(i), &tspb.TxPvtReadWriteSetWithConfigInfo{
PvtRwset: &rwset.TxPvtReadWriteSet{
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "c1",
Rwset: []byte("rws-pre-image"),
},
},
},
},
},
CollectionConfigs: make(map[string]*peer.CollectionConfigPackage),
})
}
assertPurged := func(purged bool) bool {
numTx := 9
if purged {
numTx = 10
}
for i := 1; i <= numTx; i++ {
txID := fmt.Sprintf("tx%d", i)
iterator, err := store.GetTxPvtRWSetByTxid(txID, nil)
if err != nil {
iterator.Close()
t.Fatalf("Failed iterating, got err %s", err)
}
res, err := iterator.Next()
iterator.Close()
if err != nil {
t.Fatalf("Failed iterating, got err %s", err)
}
if (i < 6 || i == numTx) && purged {
if res != nil {
return false
}
continue
}
if res == nil {
return false
}
}
return true
}
fetcher := &fetcherMock{t: t}
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
pdFactory := &pvtDataFactory{}
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, conf, idDeserializerFactory)
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
block := bf.AddTxn("tx10", "ns1", hash, "c1").create()
block.Header.Number = 10
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1").create()
// test no blocks purged yet
assertPurgedBlocks := func() bool {
return assertPurged(false)
}
require.Eventually(t, assertPurgedBlocks, 2*time.Second, 100*time.Millisecond)
err := coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
// test first 6 blocks were purged
assertPurgedBlocks = func() bool {
return assertPurged(true)
}
require.Eventually(t, assertPurgedBlocks, 2*time.Second, 100*time.Millisecond)
}
func TestCoordinatorStorePvtData(t *testing.T) {
mspID := "Org1MSP"
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
cs := createcollectionStore(protoutil.SignedData{}).thatAcceptsAll()
committer := &privdatamocks.Committer{}
store := newTransientStore(t)
defer store.tearDown()
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
fetcher := &fetcherMock{t: t}
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, protoutil.SignedData{}, metrics, testConfig, idDeserializerFactory)
pvtData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c1").create()
// Green path: ledger height can be retrieved from ledger/committer
err := coordinator.StorePvtData("tx1", &tspb.TxPvtReadWriteSetWithConfigInfo{
PvtRwset: pvtData[0].WriteSet,
CollectionConfigs: make(map[string]*peer.CollectionConfigPackage),
}, uint64(5))
require.NoError(t, err)
}
func TestContainsWrites(t *testing.T) {
// Scenario I: Nil HashedRwSet in collection
col := &rwsetutil.CollHashedRwSet{
CollectionName: "col1",
}
require.False(t, containsWrites("tx", "ns", col))
// Scenario II: No writes in collection
col.HashedRwSet = &kvrwset.HashedRWSet{}
require.False(t, containsWrites("tx", "ns", col))
// Scenario III: Some writes in collection
col.HashedRwSet.HashedWrites = append(col.HashedRwSet.HashedWrites, &kvrwset.KVWriteHash{})
require.True(t, containsWrites("tx", "ns", col))
}
func TestIgnoreReadOnlyColRWSets(t *testing.T) {
// Scenario: The transaction has some ColRWSets that have only reads and no writes,
// These should be ignored and not considered as missing private data that needs to be retrieved
// from the transient store or other peers.
// The gossip and transient store mocks in this test aren't initialized with
// actions, so if the coordinator attempts to fetch private data from the
// transient store or other peers, the test would fail.
// Also - we check that at commit time - the coordinator concluded that
// no missing private data was found.
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier())
var commitHappened bool
assertCommitHappened := func() {
require.True(t, commitHappened)
commitHappened = false
}
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData)
// Ensure there is no private data to commit
require.Empty(t, blockAndPrivateData.PvtData)
// Ensure there is no missing private data
require.Empty(t, blockAndPrivateData.MissingPvtData)
commitHappened = true
commitOpts := args.Get(1).(*ledger.CommitOptions)
expectedCommitOpts := &ledger.CommitOptions{FetchPvtDataFromLedger: false}
require.Equal(t, expectedCommitOpts, commitOpts)
}).Return(nil)
store := newTransientStore(t)
defer store.tearDown()
fetcher := &fetcherMock{t: t}
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
// The block contains a read only private data transaction
block := bf.AddReadOnlyTxn("tx1", "ns3", hash, "c3", "c2").create()
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
metrics := metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
// We pass a nil private data slice to indicate no pre-images though the block contains
// private data reads.
err = coordinator.StoreBlock(block, nil)
require.NoError(t, err)
assertCommitHappened()
}
func TestCoordinatorMetrics(t *testing.T) {
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))
identity, err := mspmgmt.GetLocalMSP(factory.GetDefault()).GetDefaultSigningIdentity()
require.NoError(t, err)
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
mspID := "Org1MSP"
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll().withMSPIdentity(identity.GetMSPIdentifier())
committer := &privdatamocks.Committer{}
committer.On("CommitLegacy", mock.Anything, mock.Anything).Return(nil)
store := newTransientStore(t)
defer store.tearDown()
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
pdFactory := &pvtDataFactory{}
bf := &blockFactory{
channelID: "testchannelid",
}
idDeserializerFactory := IdentityDeserializerFactoryFunc(func(chainID string) msp.IdentityDeserializer {
return mspmgmt.GetManagerForChain("testchannelid")
})
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2").
AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").
AddTxnWithEndorsement("tx3", "ns3", hash, "org3", true, "c1").create()
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create()
// fetch duration metric only reported when fetching from remote peer
fetcher := &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
{
TxId: "tx3", Namespace: "ns3", Collection: "c1", BlockSeq: 1, SeqInBlock: 2,
},
}).Return(&privdatacommon.FetchedPvtDataContainer{
AvailableElements: []*proto.PvtDataElement{
{
Digest: &proto.PvtDataDigest{
SeqInBlock: 2,
BlockSeq: 1,
Collection: "c1",
Namespace: "ns3",
TxId: "tx3",
},
Payload: [][]byte{[]byte("rws-pre-image")},
},
},
}, nil)
testMetricProvider := gmetricsmocks.TestUtilConstructMetricProvider()
metrics := metrics.NewGossipMetrics(testMetricProvider.FakeProvider).PrivdataMetrics
committer.On("DoesPvtDataInfoExistInLedger", mock.Anything).Return(false, nil)
capabilityProvider := &privdatamocks.CapabilityProvider{}
appCapability := &privdatamocks.ApplicationCapabilities{}
capabilityProvider.On("Capabilities").Return(appCapability)
appCapability.On("StorePvtDataOfInvalidTx").Return(true)
coordinator := NewCoordinator(mspID, Support{
ChainID: "testchannelid",
CollectionStore: cs,
Committer: committer,
Fetcher: fetcher,
Validator: &validatorMock{},
CapabilityProvider: capabilityProvider,
}, store.store, peerSelfSignedData, metrics, testConfig, idDeserializerFactory)
err = coordinator.StoreBlock(block, pvtData)
require.NoError(t, err)
// make sure all coordinator metrics were reported
require.Equal(t,
[]string{"channel", "testchannelid"},
testMetricProvider.FakeValidationDuration.WithArgsForCall(0),
)
require.True(t, testMetricProvider.FakeValidationDuration.ObserveArgsForCall(0) > 0)
require.Equal(t,
[]string{"channel", "testchannelid"},
testMetricProvider.FakeListMissingPrivateDataDuration.WithArgsForCall(0),
)
require.True(t, testMetricProvider.FakeListMissingPrivateDataDuration.ObserveArgsForCall(0) > 0)
require.Equal(t,
[]string{"channel", "testchannelid"},
testMetricProvider.FakeFetchDuration.WithArgsForCall(0),
)
// fetch duration metric only reported when fetching from remote peer
require.True(t, testMetricProvider.FakeFetchDuration.ObserveArgsForCall(0) > 0)
require.Equal(t,
[]string{"channel", "testchannelid"},
testMetricProvider.FakeCommitPrivateDataDuration.WithArgsForCall(0),
)
require.True(t, testMetricProvider.FakeCommitPrivateDataDuration.ObserveArgsForCall(0) > 0)
require.Equal(t,
[]string{"channel", "testchannelid"},
testMetricProvider.FakePurgeDuration.WithArgsForCall(0),
)
purgeDuration := func() bool {
return testMetricProvider.FakePurgeDuration.ObserveArgsForCall(0) > 0
}
require.Eventually(t, purgeDuration, 2*time.Second, 100*time.Millisecond)
}