go_study/fabric-main/internal/ledgerutil/identifytxs/identifytxs.go

492 lines
15 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package identifytxs
import (
"encoding/hex"
"fmt"
"math"
"path/filepath"
"strings"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/internal/fileutil"
"github.com/hyperledger/fabric/internal/ledgerutil/jsonrw"
"github.com/hyperledger/fabric/internal/ledgerutil/models"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
const (
ledgersDataDirName = "ledgersData"
nsJoiner = "$$"
pvtDataPrefix = "p"
hashDataPrefix = "h"
)
// IdentifyTxs - IdentifyTxs identifies all transactions related to a list of records (namespace / key pairs)
// Tool was originally created to map list of divergent records from ledgerutil compare output to their respective transactions
// to identify list of transactions that have potentially caused divergent state
// Returns two block numbers that represent the starting and ending blocks of the transaction search range
func IdentifyTxs(recPath string, fsPath string, outputDirLoc string) (uint64, uint64, error) {
// Read diffRecord list from json
var records *models.DiffRecordList
records, err := jsonrw.LoadRecords(recPath)
if err != nil {
return 0, 0, err
}
if len(records.DiffRecords) == 0 {
return 0, 0, errors.Errorf("no records were read. JSON record list is either empty or not properly formatted. Aborting identifytxs")
}
// Output directory creation
outputDirName := fmt.Sprintf("%s_identified_transactions", records.Ledgerid)
outputDirPath := filepath.Join(outputDirLoc, outputDirName)
empty, err := fileutil.CreateDirIfMissing(outputDirPath)
if err != nil {
return 0, 0, err
}
if !empty {
return 0, 0, errors.Errorf("%s already exists in %s. Choose a different location or remove the existing results. Aborting identifytxs", outputDirName, outputDirLoc)
}
// Get recordsMap from json
inputKeyMapWrapper, err := generateRecordsMap(records.DiffRecords, outputDirPath)
if err != nil {
return 0, 0, err
}
// Set up block store and block iterator in preparation for traversal
blockStoreProvider, err := getBlockStoreProvider(fsPath)
if err != nil {
return 0, 0, err
}
defer blockStoreProvider.Close()
blockStoreExists, err := blockStoreProvider.Exists(records.Ledgerid)
if err != nil {
return 0, 0, err
}
if !blockStoreExists {
return 0, 0, errors.Errorf("BlockStore for %s does not exist. Aborting identifytxs", records.Ledgerid)
}
blockStore, err := blockStoreProvider.Open(records.Ledgerid)
if err != nil {
return 0, 0, err
}
// Identify relevant transactions and write to json
firstBlock, lastBlock, err := findAndWriteTxs(blockStore, inputKeyMapWrapper)
if err != nil {
return 0, 0, err
}
return firstBlock, lastBlock, nil
}
// compKey represents a composite key which is simply a namespace and key pair
type compKey struct {
namespace, collection, key string
}
// searchLimitAndJSONWriter represents data relevant for the find and write transactions algorithm
type searchLimitAndJSONWriter struct {
searchBlockLimit uint64
searchTxLimit uint64
writer *jsonrw.JSONFileWriter
}
type compKeyMap map[compKey]*searchLimitAndJSONWriter
// Closes all open output file writers that have not reached their search height limits
// Should only be called in findAndWriteTxs
func (m compKeyMap) closeAll(blockIdx uint64, txIdx uint64, availableIdx uint64) error {
for _, d := range m {
err := d.writer.CloseList()
if err != nil {
return err
}
// Check if block height was reached
if blockIdx < availableIdx || (d.searchBlockLimit <= blockIdx && d.searchTxLimit <= txIdx) {
err = d.writer.AddField("blockStoreHeightSufficient", true)
if err != nil {
return err
}
} else {
err = d.writer.AddField("blockStoreHeightSufficient", false)
if err != nil {
return err
}
}
err = d.writer.CloseObject()
if err != nil {
return err
}
err = d.writer.Close()
if err != nil {
return err
}
}
return nil
}
// Closes an open output file writer that has reached its search height limit
// Should only be called in findAndWriteTxs
func (m compKeyMap) close(d *searchLimitAndJSONWriter) error {
err := d.writer.CloseList()
if err != nil {
return err
}
err = d.writer.AddField("blockStoreHeightSufficient", true)
if err != nil {
return err
}
err = d.writer.CloseObject()
if err != nil {
return err
}
err = d.writer.Close()
if err != nil {
return err
}
return nil
}
type compKeyMapWrapper struct {
compKeyMap compKeyMap
maxBlockNum uint64
maxTxNum uint64
}
// Generates an efficient data structure for checking records during block store traversal
// and for storing output file writers
// Returns generated compKeyMap and highest record height in a compKeyMapWrapper
func generateRecordsMap(records []*models.DiffRecord, outputDirPath string) (ckmw *compKeyMapWrapper, err error) {
// Reorganize records as hashmap for faster lookups
inputKeyMap := make(compKeyMap)
maxBlock := uint64(0)
maxTx := uint64(0)
for i, r := range records {
// Confirm all records have at least namespace and key
if r.Namespace == "" || r.Key == "" {
return nil, errors.Errorf("invalid input json. Each record entry must contain both " +
"a \"namespace\" and a \"key\" field. Aborting identifytxs")
}
// Check for hashed data
var ns, coll string
if r.Hashed {
ns, coll, err = decodeHashedNs(r.Namespace)
if err != nil {
return nil, err
}
} else {
ns = r.Namespace
}
// Record to compKey
ck := compKey{namespace: ns, collection: coll, key: r.Key}
// Check for duplicate entry
_, exists := inputKeyMap[ck]
if exists {
return nil, errors.Errorf("invalid input json. Contains duplicate record for "+
" {\"namespace\":\"%s\",\"key\":\"%s\"}. Aborting identifytxs", r.Namespace, r.Key)
}
// New entry, add entry and height
blockNum, txNum := r.GetLaterHeight()
// Only namespace and key were provided, record doesn't contain height. Set to "infinite" height to iterate entire block store.
if blockNum == 0 && txNum == 0 {
blockNum = math.MaxUint64
txNum = math.MaxUint64
}
// Check for max height
if blockNum > maxBlock || (blockNum == maxBlock && txNum > maxTx) {
maxBlock = blockNum
maxTx = txNum
}
// Create output file writer
filename := fmt.Sprintf("txlist%d.json", i+1)
ofw, err := jsonrw.NewJSONFileWriter(filepath.Join(outputDirPath, filename))
if err != nil {
return nil, err
}
// Write namespace and key to output and start list
err = ofw.OpenObject()
if err != nil {
return nil, err
}
err = ofw.AddField("namespace", ck.namespace)
if err != nil {
return nil, err
}
err = ofw.AddField("key", ck.key)
if err != nil {
return nil, err
}
var emptySlice []interface{}
err = ofw.AddField("txs", emptySlice)
if err != nil {
return nil, err
}
inputKeyMap[ck] = &searchLimitAndJSONWriter{
searchBlockLimit: blockNum,
searchTxLimit: txNum,
writer: ofw,
}
}
return &compKeyMapWrapper{
compKeyMap: inputKeyMap,
maxBlockNum: maxBlock,
maxTxNum: maxTx,
}, nil
}
// Get a default block store provider to access the peer's block store
func getBlockStoreProvider(fsPath string) (*blkstorage.BlockStoreProvider, error) {
// Format path to block store
blockStorePath := kvledger.BlockStorePath(filepath.Join(fsPath, ledgersDataDirName))
isEmpty, err := fileutil.DirEmpty(blockStorePath)
if err != nil {
return nil, err
}
if isEmpty {
return nil, errors.Errorf("provided path %s is empty. Aborting identifytxs", fsPath)
}
// Default fields for block store provider
conf := blkstorage.NewConf(blockStorePath, 0)
indexConfig := &blkstorage.IndexConfig{
AttrsToIndex: []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
},
}
metricsProvider := &disabled.Provider{}
// Create new block store provider
blockStoreProvider, err := blkstorage.NewProvider(conf, indexConfig, metricsProvider)
if err != nil {
return nil, err
}
return blockStoreProvider, nil
}
// txEntry represents and encapsulates a relevant transaction identified on the block store
type txEntry struct {
TxID string `json:"txid"`
BlockNum uint64 `json:"blockNum"`
TxNum uint64 `json:"txNum"`
TxVStatus string `json:"txValidationStatus"`
KeyWrite string `json:"keyWrite"`
}
// Builds the transaction sets for each record by identifying and aggregating block store transactions containing the relevant records
func findAndWriteTxs(blockStore *blkstorage.BlockStore, inputKeyMapWrapper *compKeyMapWrapper) (uint64, uint64, error) {
// Preparation for traversing block store
blockchainInfo, err := blockStore.GetBlockchainInfo()
if err != nil {
return 0, 0, err
}
// Get available block range for iterator
minBlockHeight := uint64(1)
// Check for first available block if block store is from bootstrapped peer
snapshotInfo := blockchainInfo.GetBootstrappingSnapshotInfo()
if snapshotInfo != nil {
minBlockHeight = snapshotInfo.GetLastBlockInSnapshot() + uint64(1)
}
maxAvailable := blockchainInfo.GetHeight() - 1
maxBlockHeight := maxAvailable
maxTxHeight := uint64(math.MaxUint64)
// Check if compKeyMap's highest record is earlier than block store's max block height
if inputKeyMapWrapper.maxBlockNum <= maxBlockHeight {
maxBlockHeight = inputKeyMapWrapper.maxBlockNum
maxTxHeight = inputKeyMapWrapper.maxTxNum
}
blocksItr, err := blockStore.RetrieveBlocks(minBlockHeight)
if err != nil {
return 0, 0, err
}
defer blocksItr.Close()
inputKeyMap := inputKeyMapWrapper.compKeyMap
// Set block index to minimum allowed block height
blockIndex := minBlockHeight
txIndex := uint64(0)
// Iterate through all completed blocks of block store
for blockIndex <= maxBlockHeight {
// Unpack next block
nextBlock, err := blocksItr.Next()
if err != nil {
return 0, 0, err
}
block := nextBlock.(*common.Block)
blockData := block.GetData().GetData()
// Iterate through block transactions
for txIndexInt, envBytes := range blockData {
txIndex = uint64(txIndexInt)
// Extract next transaction
env, err := protoutil.GetEnvelopeFromBlock(envBytes)
if err != nil {
return 0, 0, err
}
pl, err := protoutil.UnmarshalPayload(env.GetPayload())
if err != nil {
return 0, 0, err
}
ch, err := protoutil.UnmarshalChannelHeader(pl.Header.ChannelHeader)
if err != nil {
return 0, 0, err
}
// Check that transaction is endorser transaction, otherwise skip
txType := common.HeaderType(ch.Type)
if txType == common.HeaderType_ENDORSER_TRANSACTION {
txID := ch.GetTxId()
// Extract write set from transaction then iterate through transaction write set
res, err := protoutil.GetActionFromEnvelope(envBytes)
if err != nil {
return 0, 0, err
}
// RW set
txRWSet := &rwsetutil.TxRwSet{}
err = txRWSet.FromProtoBytes(res.GetResults())
if err != nil {
return 0, 0, err
}
// Iterate namespaces in read write set
nsRWSets := txRWSet.NsRwSets
for _, nsRWSet := range nsRWSets {
namespace := nsRWSet.NameSpace
// Iterate public keys
writes := nsRWSet.KvRwSet.GetWrites()
for _, write := range writes {
key := write.GetKey()
ck := compKey{namespace: namespace, key: key}
_, exists := inputKeyMap[ck]
if exists {
// Get txValidationCode
txvCode, _, err := blockStore.RetrieveTxValidationCodeByTxID(txID)
if err != nil {
return 0, 0, err
}
// Create new transaction entry
entry := txEntry{
TxID: txID,
BlockNum: blockIndex,
TxNum: txIndex,
TxVStatus: txvCode.String(),
KeyWrite: string(write.GetValue()),
}
// Capture new transaction entry
ckMapEmpty, err := captureTx(inputKeyMap, ck, entry)
if err != nil {
return 0, 0, err
}
// No more keys left
if ckMapEmpty {
return minBlockHeight, blockIndex, nil
}
}
}
// Iterate collections
collections := nsRWSet.CollHashedRwSets
for _, collection := range collections {
// Iterate private key hashes
hashedWrites := collection.HashedRwSet.GetHashedWrites()
for _, hashedWrite := range hashedWrites {
keyHash := hashedWrite.GetKeyHash()
// Get hexadecimal encoding of key hash
ck := compKey{namespace: namespace, collection: (collection.CollectionName), key: hex.EncodeToString(keyHash)}
_, exists := inputKeyMap[ck]
if exists {
// Get txValidationCode
txvCode, _, err := blockStore.RetrieveTxValidationCodeByTxID(txID)
if err != nil {
return 0, 0, err
}
// Create new transaction entry
entry := txEntry{
TxID: txID,
BlockNum: blockIndex,
TxNum: txIndex,
TxVStatus: txvCode.String(),
KeyWrite: hex.EncodeToString(hashedWrite.GetValueHash()),
}
// Capture new transaction entry
ckMapEmpty, err := captureTx(inputKeyMap, ck, entry)
if err != nil {
return 0, 0, err
}
// No more keys left
if ckMapEmpty {
return minBlockHeight, blockIndex, nil
}
}
}
}
}
// Check if highest record in inputKeyMap has been reached
if blockIndex == maxBlockHeight && txIndex == maxTxHeight {
err = inputKeyMap.closeAll(blockIndex, txIndex, maxAvailable)
if err != nil {
return 0, 0, err
}
return minBlockHeight, maxBlockHeight, nil
}
}
}
blockIndex++
}
// Close out any remaining open output file writers
err = inputKeyMap.closeAll((blockIndex - 1), txIndex, maxAvailable)
if err != nil {
return 0, 0, err
}
return minBlockHeight, maxBlockHeight, nil
}
// Adds a transaction entry to a key's JSONFileWriter then removes key if transaction is at its record height
// Returns true if compKeyMap is empty and false if compKeyMap still has keys to search for
func captureTx(inputKeyMap compKeyMap, k compKey, e txEntry) (bool, error) {
// Get JSONFileWriter
v := inputKeyMap[k]
// Add new transaction entry
err := v.writer.AddEntry(e)
if err != nil {
return false, err
}
// Check if record height has been reached and remove key if so
if e.BlockNum == v.searchBlockLimit && e.TxNum == v.searchTxLimit {
err = inputKeyMap.close(v)
if err != nil {
return false, err
}
delete(inputKeyMap, k)
// Check if all keys have reached their height limit
if len(inputKeyMap) == 0 {
return true, nil
}
}
return false, nil
}
// Exctracts namespace from snapshot namespace concatenation
func decodeHashedNs(hashedDataNs string) (string, string, error) {
strs := strings.Split(hashedDataNs, nsJoiner+hashDataPrefix)
if len(strs) != 2 {
return "", "", errors.Errorf("not a valid hashedDataNs [%s]", hashedDataNs)
}
return strs[0], strs[1], nil
}