go_study/fabric-main/gossip/privdata/pvtdataprovider.go

615 lines
24 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privdata
import (
"bytes"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/msp"
"github.com/hyperledger/fabric-protos-go/peer"
vsccErrors "github.com/hyperledger/fabric/common/errors"
"github.com/hyperledger/fabric/common/metrics"
commonutil "github.com/hyperledger/fabric/common/util"
pvtdatasc "github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/transientstore"
pvtdatacommon "github.com/hyperledger/fabric/gossip/privdata/common"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protoutil"
)
type sleeper struct {
sleep func(time.Duration)
}
func (s sleeper) Sleep(d time.Duration) {
if s.sleep == nil {
time.Sleep(d)
return
}
s.sleep(d)
}
type RetrievedPvtdata struct {
blockPvtdata *ledger.BlockPvtdata
pvtdataRetrievalInfo *pvtdataRetrievalInfo
transientStore *transientstore.Store
logger util.Logger
purgeDurationHistogram metrics.Histogram
blockNum uint64
transientBlockRetention uint64
}
// GetBlockPvtdata returns the BlockPvtdata
func (r *RetrievedPvtdata) GetBlockPvtdata() *ledger.BlockPvtdata {
return r.blockPvtdata
}
// Purge purges private data for transactions in the block from the transient store.
// Transactions older than the retention period are considered orphaned and also purged.
func (r *RetrievedPvtdata) Purge() {
purgeStart := time.Now()
if len(r.blockPvtdata.PvtData) > 0 {
// Finally, purge all transactions in block - valid or not valid.
if err := r.transientStore.PurgeByTxids(r.pvtdataRetrievalInfo.txns); err != nil {
r.logger.Errorf("Purging transactions %v failed: %s", r.pvtdataRetrievalInfo.txns, err)
}
}
blockNum := r.blockNum
if blockNum%r.transientBlockRetention == 0 && blockNum > r.transientBlockRetention {
err := r.transientStore.PurgeBelowHeight(blockNum - r.transientBlockRetention)
if err != nil {
r.logger.Errorf("Failed purging data from transient store at block [%d]: %s", blockNum, err)
}
}
r.purgeDurationHistogram.Observe(time.Since(purgeStart).Seconds())
}
type eligibilityComputer struct {
logger util.Logger
storePvtdataOfInvalidTx bool
channelID string
selfSignedData protoutil.SignedData
idDeserializerFactory IdentityDeserializerFactory
}
// computeEligibility computes eligibility of private data and
// groups all private data as either eligibleMissing or ineligibleMissing prior to fetching
func (ec *eligibilityComputer) computeEligibility(mspID string, pvtdataToRetrieve []*ledger.TxPvtdataInfo) (*pvtdataRetrievalInfo, error) {
sources := make(map[rwSetKey][]*peer.Endorsement)
expectedHashes := make(map[rwSetKey][]byte)
eligibleMissingKeys := make(rwsetKeys)
ineligibleMissingKeys := make(rwsetKeys)
var txList []string
for _, txPvtdata := range pvtdataToRetrieve {
txID := txPvtdata.TxID
seqInBlock := txPvtdata.SeqInBlock
invalid := txPvtdata.Invalid
txList = append(txList, txID)
if invalid && !ec.storePvtdataOfInvalidTx {
ec.logger.Debugf("Skipping Tx [%s] at sequence [%d] because it's invalid.", txID, seqInBlock)
continue
}
deserializer := ec.idDeserializerFactory.GetIdentityDeserializer(ec.channelID)
for _, colInfo := range txPvtdata.CollectionPvtdataInfo {
ns := colInfo.Namespace
col := colInfo.Collection
hash := colInfo.ExpectedHash
endorsers := colInfo.Endorsers
colConfig := colInfo.CollectionConfig
policy, err := pvtdatasc.NewSimpleCollection(colConfig, deserializer)
if err != nil {
ec.logger.Errorf("Failed to retrieve collection access policy for chaincode [%s], collection name [%s] for txID [%s]: %s.",
ns, col, txID, err)
return nil, &vsccErrors.VSCCExecutionFailureError{Err: err}
}
key := rwSetKey{
txID: txID,
seqInBlock: seqInBlock,
namespace: ns,
collection: col,
}
// First check if mspID is found in the MemberOrgs before falling back to AccessFilter policy evaluation
memberOrgs := policy.MemberOrgs()
if _, ok := memberOrgs[mspID]; !ok &&
!policy.AccessFilter()(ec.selfSignedData) {
ec.logger.Debugf("Peer is not eligible for collection: chaincode [%s], "+
"collection name [%s], txID [%s] the policy is [%#v]. Skipping.",
ns, col, txID, policy)
ineligibleMissingKeys[key] = rwsetInfo{}
continue
}
// treat all eligible keys as missing
eligibleMissingKeys[key] = rwsetInfo{
invalid: invalid,
}
sources[key] = endorsersFromEligibleOrgs(ns, col, endorsers, memberOrgs)
expectedHashes[key] = hash
}
}
return &pvtdataRetrievalInfo{
sources: sources,
expectedHashes: expectedHashes,
txns: txList,
remainingEligibleMissingKeys: eligibleMissingKeys,
resolvedAsToReconcileLater: make(rwsetKeys),
ineligibleMissingKeys: ineligibleMissingKeys,
}, nil
}
type PvtdataProvider struct {
mspID string
selfSignedData protoutil.SignedData
logger util.Logger
listMissingPrivateDataDurationHistogram metrics.Histogram
fetchDurationHistogram metrics.Histogram
purgeDurationHistogram metrics.Histogram
transientStore *transientstore.Store
pullRetryThreshold time.Duration
prefetchedPvtdata util.PvtDataCollections
transientBlockRetention uint64
channelID string
blockNum uint64
storePvtdataOfInvalidTx bool
skipPullingInvalidTransactions bool
idDeserializerFactory IdentityDeserializerFactory
fetcher Fetcher
sleeper sleeper
}
// RetrievePvtdata is passed a list of private data items from a block,
// it determines which private data items this peer is eligible for, and then
// retrieves the private data from local cache, local transient store, or a remote peer.
func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdataInfo) (*RetrievedPvtdata, error) {
retrievedPvtdata := &RetrievedPvtdata{
transientStore: pdp.transientStore,
logger: pdp.logger,
purgeDurationHistogram: pdp.purgeDurationHistogram,
blockNum: pdp.blockNum,
transientBlockRetention: pdp.transientBlockRetention,
}
listMissingStart := time.Now()
eligibilityComputer := &eligibilityComputer{
logger: pdp.logger,
storePvtdataOfInvalidTx: pdp.storePvtdataOfInvalidTx,
channelID: pdp.channelID,
selfSignedData: pdp.selfSignedData,
idDeserializerFactory: pdp.idDeserializerFactory,
}
pvtdataRetrievalInfo, err := eligibilityComputer.computeEligibility(pdp.mspID, pvtdataToRetrieve)
if err != nil {
return nil, err
}
pdp.listMissingPrivateDataDurationHistogram.Observe(time.Since(listMissingStart).Seconds())
pvtdata := make(rwsetByKeys)
// If there is no private data to retrieve for the block, skip all population attempts and return
if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Debugf("No eligible collection private write sets to fetch for block [%d]", pdp.blockNum)
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}
fetchStats := &fetchStats{}
totalEligibleMissingKeysToRetrieve := len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
// POPULATE FROM CACHE
pdp.populateFromCache(pvtdata, pvtdataRetrievalInfo, pvtdataToRetrieve)
fetchStats.fromLocalCache = totalEligibleMissingKeysToRetrieve - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Infof("Successfully fetched (or marked to reconcile later) all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats)
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}
// POPULATE FROM TRANSIENT STORE
numRemainingToFetch := len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
pdp.populateFromTransientStore(pvtdata, pvtdataRetrievalInfo)
fetchStats.fromTransientStore = numRemainingToFetch - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Infof("Successfully fetched (or marked to reconcile later) all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats)
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}
// POPULATE FROM REMOTE PEERS
numRemainingToFetch = len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
retryThresh := pdp.pullRetryThreshold
pdp.logger.Debugf("Could not find all collection private write sets in local peer transient store for block [%d]", pdp.blockNum)
pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys), retryThresh)
startPull := time.Now()
for len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh {
if needToRetry := pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo); !needToRetry {
break
}
// If there are still missing keys, sleep before retry
pdp.sleeper.Sleep(pullRetrySleepInterval)
}
elapsedPull := int64(time.Since(startPull) / time.Millisecond) // duration in ms
pdp.fetchDurationHistogram.Observe(time.Since(startPull).Seconds())
fetchStats.fromRemotePeer = numRemainingToFetch - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Debugf("Fetched (or marked to reconcile later) collection private write sets from remote peers for block [%d] (%dms)", pdp.blockNum, elapsedPull)
pdp.logger.Infof("Successfully fetched (or marked to reconcile later) all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats)
} else {
pdp.logger.Warningf("Could not fetch (or mark to reconcile later) %d eligible collection private write sets for block [%d] %s. Will commit block with missing private write sets:[%v]",
totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats, pvtdataRetrievalInfo.remainingEligibleMissingKeys)
}
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}
// populateFromCache populates pvtdata with data fetched from cache and updates
// pvtdataRetrievalInfo by removing missing data that was fetched from cache
func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo, pvtdataToRetrieve []*ledger.TxPvtdataInfo) {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from cache.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys))
for _, txPvtdata := range pdp.prefetchedPvtdata {
txID := getTxIDBySeqInBlock(txPvtdata.SeqInBlock, pvtdataToRetrieve)
// if can't match txID from query, then the data was never requested so skip the entire tx
if txID == "" {
pdp.logger.Warningf("Found extra data in prefetched at sequence [%d]. Skipping.", txPvtdata.SeqInBlock)
continue
}
for _, ns := range txPvtdata.WriteSet.NsPvtRwset {
for _, col := range ns.CollectionPvtRwset {
key := rwSetKey{
txID: txID,
seqInBlock: txPvtdata.SeqInBlock,
collection: col.CollectionName,
namespace: ns.Namespace,
}
// skip if key not originally missing
if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing {
pdp.logger.Warningf("Found extra data in prefetched:[%v]. Skipping.", key)
continue
}
if bytes.Equal(pvtdataRetrievalInfo.expectedHashes[key], commonutil.ComputeSHA256(col.Rwset)) {
// populate the pvtdata with the RW set from the cache
pvtdata[key] = col.Rwset
} else {
// the private data was present in the cache but the hash of writeset did not match with what is present in block.
// Most likely scenarios for this are when either the sending peer is bootstrapped from a snapshot or it has purged some
// of the keys from the private data, based on a user initiated transaction. In this case, we treat this as missing data,
// that would be tried later via reconciliation
pvtdataRetrievalInfo.resolvedAsToReconcileLater[key] = pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]
}
// remove key from missing
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key)
} // iterate over collections in the namespace
} // iterate over the namespaces in the WSet
} // iterate over cached private data in the block
}
// populateFromTransientStore populates pvtdata with data fetched from transient store
// and updates pvtdataRetrievalInfo by removing missing data that was fetched from transient store
func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from transient store.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys))
// Put into pvtdata RW sets that are missing and found in the transient store
for k := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
filter := ledger.NewPvtNsCollFilter()
filter.Add(k.namespace, k.collection)
iterator, err := pdp.transientStore.GetTxPvtRWSetByTxid(k.txID, filter)
if err != nil {
pdp.logger.Warningf("Failed fetching private data from transient store: Failed obtaining iterator from transient store: %s", err)
return
}
defer iterator.Close()
for {
res, err := iterator.Next()
if err != nil {
pdp.logger.Warningf("Failed fetching private data from transient store: Failed iterating over transient store data: %s", err)
return
}
if res == nil {
// End of iteration
break
}
if res.PvtSimulationResultsWithConfig == nil {
pdp.logger.Warningf("Resultset's PvtSimulationResultsWithConfig for txID [%s] is nil. Skipping.", k.txID)
continue
}
simRes := res.PvtSimulationResultsWithConfig
// simRes.PvtRwset will be nil if the transient store contains an entry for the txid but the entry does not contain the data for the collection
if simRes.PvtRwset == nil {
pdp.logger.Debugf("The PvtRwset of PvtSimulationResultsWithConfig for txID [%s] is nil. Skipping.", k.txID)
continue
}
for _, ns := range simRes.PvtRwset.NsPvtRwset {
for _, col := range ns.CollectionPvtRwset {
key := rwSetKey{
txID: k.txID,
seqInBlock: k.seqInBlock,
collection: col.CollectionName,
namespace: ns.Namespace,
}
// skip if not missing
if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing {
continue
}
if !bytes.Equal(pvtdataRetrievalInfo.expectedHashes[key], commonutil.ComputeSHA256(col.Rwset)) {
continue
}
// populate the pvtdata with the RW set from the transient store
pdp.logger.Debugf("Found private data for key %v in transient store", key)
pvtdata[key] = col.Rwset
// remove key from missing
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key)
} // iterating over all collections
} // iterating over all namespaces
} // iterating over the TxPvtRWSet results
}
}
// populateFromRemotePeers populates pvtdata with data fetched from remote peers and updates
// pvtdataRetrievalInfo by removing missing data that was fetched from remote peers
func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) bool {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys))
dig2src := make(map[pvtdatacommon.DigKey][]*peer.Endorsement)
var skipped int
for k, v := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
if v.invalid && pdp.skipPullingInvalidTransactions {
pdp.logger.Debugf("Skipping invalid key [%v] because peer is configured to skip pulling rwsets of invalid transactions.", k)
skipped++
continue
}
pdp.logger.Debugf("Fetching [%v] from remote peers", k)
dig := pvtdatacommon.DigKey{
TxId: k.txID,
SeqInBlock: k.seqInBlock,
Collection: k.collection,
Namespace: k.namespace,
BlockSeq: pdp.blockNum,
}
dig2src[dig] = pvtdataRetrievalInfo.sources[k]
}
if len(dig2src) == 0 {
return false
}
fetchedData, err := pdp.fetcher.fetch(dig2src)
if err != nil {
pdp.logger.Warningf("Failed fetching private data from remote peers for dig2src:[%v], err: %s", dig2src, err)
return true
}
// Iterate over data fetched from remote peers
for _, element := range fetchedData.AvailableElements {
dig := element.Digest
for _, rws := range element.Payload {
key := rwSetKey{
txID: dig.TxId,
namespace: dig.Namespace,
collection: dig.Collection,
seqInBlock: dig.SeqInBlock,
}
// skip if not missing
if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing {
// key isn't missing and was never fetched earlier, log that it wasn't originally requested
if _, exists := pvtdata[key]; !exists {
pdp.logger.Debugf("Ignoring [%v] because it was never requested.", key)
}
continue
}
if bytes.Equal(pvtdataRetrievalInfo.expectedHashes[key], commonutil.ComputeSHA256(rws)) {
// populate the pvtdata with the RW set from the remote peer
pvtdata[key] = rws
} else {
// the private data was fetched from the remote peer but the hash of writeset did not match with what is present in block.
// Most likely scenarios for this are when either the sending peer is bootstrapped from a snapshot or it has purged some
// of the keys from the private data, based on a user initiated transaction. In this case, we treat this as missing data,
// that would be tried later via reconciliation
pvtdataRetrievalInfo.resolvedAsToReconcileLater[key] = pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]
}
// remove key from missing
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key)
pdp.logger.Debugf("Fetched [%v]", key)
}
}
// Iterate over purged data
for _, dig := range fetchedData.PurgedElements {
// delete purged key from missing keys
for missingPvtRWKey := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
if missingPvtRWKey.namespace == dig.Namespace &&
missingPvtRWKey.collection == dig.Collection &&
missingPvtRWKey.seqInBlock == dig.SeqInBlock &&
missingPvtRWKey.txID == dig.TxId {
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, missingPvtRWKey)
pdp.logger.Warningf("Missing key because was purged or will soon be purged, "+
"continue block commit without [%+v] in private rwset", missingPvtRWKey)
}
}
}
return len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) > skipped
}
// prepareBlockPvtdata consolidates the fetched private data as well as ineligible and eligible
// missing private data into a ledger.BlockPvtdata for the PvtdataProvider to return to the consumer
func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) *ledger.BlockPvtdata {
blockPvtdata := &ledger.BlockPvtdata{
PvtData: make(ledger.TxPvtDataMap),
MissingPvtData: make(ledger.TxMissingPvtData),
}
for seqInBlock, nsRWS := range pvtdata.bySeqsInBlock() {
// add all found pvtdata to blockPvtDataPvtdata for seqInBlock
blockPvtdata.PvtData[seqInBlock] = &ledger.TxPvtData{
SeqInBlock: seqInBlock,
WriteSet: nsRWS.toRWSet(),
}
}
for key := range pvtdataRetrievalInfo.resolvedAsToReconcileLater {
blockPvtdata.MissingPvtData.Add(key.seqInBlock, key.namespace, key.collection, true)
}
for key := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
blockPvtdata.MissingPvtData.Add(key.seqInBlock, key.namespace, key.collection, true)
}
for key := range pvtdataRetrievalInfo.ineligibleMissingKeys {
blockPvtdata.MissingPvtData.Add(key.seqInBlock, key.namespace, key.collection, false)
}
return blockPvtdata
}
type pvtdataRetrievalInfo struct {
sources map[rwSetKey][]*peer.Endorsement
expectedHashes map[rwSetKey][]byte
txns []string
remainingEligibleMissingKeys rwsetKeys
resolvedAsToReconcileLater rwsetKeys
ineligibleMissingKeys rwsetKeys
}
// rwset types
type readWriteSets []*readWriteSet
func (s readWriteSets) toRWSet() *rwset.TxPvtReadWriteSet {
namespaces := make(map[string]*rwset.NsPvtReadWriteSet)
dataModel := rwset.TxReadWriteSet_KV
for _, rws := range s {
if _, exists := namespaces[rws.namespace]; !exists {
namespaces[rws.namespace] = &rwset.NsPvtReadWriteSet{
Namespace: rws.namespace,
}
}
col := &rwset.CollectionPvtReadWriteSet{
CollectionName: rws.collection,
Rwset: rws.rws,
}
namespaces[rws.namespace].CollectionPvtRwset = append(namespaces[rws.namespace].CollectionPvtRwset, col)
}
var namespaceSlice []*rwset.NsPvtReadWriteSet
for _, nsRWset := range namespaces {
namespaceSlice = append(namespaceSlice, nsRWset)
}
return &rwset.TxPvtReadWriteSet{
DataModel: dataModel,
NsPvtRwset: namespaceSlice,
}
}
type readWriteSet struct {
rwSetKey
rws []byte
}
type rwsetByKeys map[rwSetKey][]byte
func (s rwsetByKeys) bySeqsInBlock() map[uint64]readWriteSets {
res := make(map[uint64]readWriteSets)
for k, rws := range s {
res[k.seqInBlock] = append(res[k.seqInBlock], &readWriteSet{
rws: rws,
rwSetKey: k,
})
}
return res
}
type rwsetInfo struct {
invalid bool
}
type rwsetKeys map[rwSetKey]rwsetInfo
// String returns a string representation of the rwsetKeys
func (s rwsetKeys) String() string {
var buffer bytes.Buffer
for k := range s {
buffer.WriteString(fmt.Sprintf("%s\n", k.String()))
}
return buffer.String()
}
type rwSetKey struct {
txID string
seqInBlock uint64
namespace string
collection string
}
// String returns a string representation of the rwSetKey
func (k *rwSetKey) String() string {
return fmt.Sprintf("txID: %s, seq: %d, namespace: %s, collection: %s", k.txID, k.seqInBlock, k.namespace, k.collection)
}
func getTxIDBySeqInBlock(seqInBlock uint64, pvtdataToRetrieve []*ledger.TxPvtdataInfo) string {
for _, txPvtdataItem := range pvtdataToRetrieve {
if txPvtdataItem.SeqInBlock == seqInBlock {
return txPvtdataItem.TxID
}
}
return ""
}
func endorsersFromEligibleOrgs(ns string, col string, endorsers []*peer.Endorsement, orgs map[string]struct{}) []*peer.Endorsement {
var res []*peer.Endorsement
for _, e := range endorsers {
sID := &msp.SerializedIdentity{}
err := proto.Unmarshal(e.Endorser, sID)
if err != nil {
logger.Warning("Failed unmarshalling endorser:", err)
continue
}
if _, ok := orgs[sID.Mspid]; !ok {
logger.Debug(sID.Mspid, "isn't among the collection's orgs:", orgs, "for namespace", ns, ",collection", col)
continue
}
res = append(res, e)
}
return res
}
type fetchStats struct {
fromLocalCache, fromTransientStore, fromRemotePeer int
}
func (stats fetchStats) String() string {
return fmt.Sprintf("(%d from local cache, %d from transient store, %d from other peers)", stats.fromLocalCache, stats.fromTransientStore, stats.fromRemotePeer)
}