492 lines
15 KiB
Go
492 lines
15 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package identifytxs
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/hyperledger/fabric-protos-go/common"
|
|
"github.com/hyperledger/fabric/common/ledger/blkstorage"
|
|
"github.com/hyperledger/fabric/common/metrics/disabled"
|
|
"github.com/hyperledger/fabric/core/ledger/kvledger"
|
|
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
|
|
"github.com/hyperledger/fabric/internal/fileutil"
|
|
"github.com/hyperledger/fabric/internal/ledgerutil/jsonrw"
|
|
"github.com/hyperledger/fabric/internal/ledgerutil/models"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
ledgersDataDirName = "ledgersData"
|
|
nsJoiner = "$$"
|
|
pvtDataPrefix = "p"
|
|
hashDataPrefix = "h"
|
|
)
|
|
|
|
// IdentifyTxs - IdentifyTxs identifies all transactions related to a list of records (namespace / key pairs)
|
|
// Tool was originally created to map list of divergent records from ledgerutil compare output to their respective transactions
|
|
// to identify list of transactions that have potentially caused divergent state
|
|
// Returns two block numbers that represent the starting and ending blocks of the transaction search range
|
|
func IdentifyTxs(recPath string, fsPath string, outputDirLoc string) (uint64, uint64, error) {
|
|
// Read diffRecord list from json
|
|
var records *models.DiffRecordList
|
|
records, err := jsonrw.LoadRecords(recPath)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
if len(records.DiffRecords) == 0 {
|
|
return 0, 0, errors.Errorf("no records were read. JSON record list is either empty or not properly formatted. Aborting identifytxs")
|
|
}
|
|
|
|
// Output directory creation
|
|
outputDirName := fmt.Sprintf("%s_identified_transactions", records.Ledgerid)
|
|
outputDirPath := filepath.Join(outputDirLoc, outputDirName)
|
|
|
|
empty, err := fileutil.CreateDirIfMissing(outputDirPath)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
if !empty {
|
|
return 0, 0, errors.Errorf("%s already exists in %s. Choose a different location or remove the existing results. Aborting identifytxs", outputDirName, outputDirLoc)
|
|
}
|
|
|
|
// Get recordsMap from json
|
|
inputKeyMapWrapper, err := generateRecordsMap(records.DiffRecords, outputDirPath)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Set up block store and block iterator in preparation for traversal
|
|
blockStoreProvider, err := getBlockStoreProvider(fsPath)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer blockStoreProvider.Close()
|
|
|
|
blockStoreExists, err := blockStoreProvider.Exists(records.Ledgerid)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
if !blockStoreExists {
|
|
return 0, 0, errors.Errorf("BlockStore for %s does not exist. Aborting identifytxs", records.Ledgerid)
|
|
}
|
|
blockStore, err := blockStoreProvider.Open(records.Ledgerid)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Identify relevant transactions and write to json
|
|
firstBlock, lastBlock, err := findAndWriteTxs(blockStore, inputKeyMapWrapper)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
return firstBlock, lastBlock, nil
|
|
}
|
|
|
|
// compKey represents a composite key which is simply a namespace and key pair
|
|
type compKey struct {
|
|
namespace, collection, key string
|
|
}
|
|
|
|
// searchLimitAndJSONWriter represents data relevant for the find and write transactions algorithm
|
|
type searchLimitAndJSONWriter struct {
|
|
searchBlockLimit uint64
|
|
searchTxLimit uint64
|
|
writer *jsonrw.JSONFileWriter
|
|
}
|
|
|
|
type compKeyMap map[compKey]*searchLimitAndJSONWriter
|
|
|
|
// Closes all open output file writers that have not reached their search height limits
|
|
// Should only be called in findAndWriteTxs
|
|
func (m compKeyMap) closeAll(blockIdx uint64, txIdx uint64, availableIdx uint64) error {
|
|
for _, d := range m {
|
|
err := d.writer.CloseList()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Check if block height was reached
|
|
if blockIdx < availableIdx || (d.searchBlockLimit <= blockIdx && d.searchTxLimit <= txIdx) {
|
|
err = d.writer.AddField("blockStoreHeightSufficient", true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
err = d.writer.AddField("blockStoreHeightSufficient", false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
err = d.writer.CloseObject()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.writer.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Closes an open output file writer that has reached its search height limit
|
|
// Should only be called in findAndWriteTxs
|
|
func (m compKeyMap) close(d *searchLimitAndJSONWriter) error {
|
|
err := d.writer.CloseList()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.writer.AddField("blockStoreHeightSufficient", true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.writer.CloseObject()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.writer.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type compKeyMapWrapper struct {
|
|
compKeyMap compKeyMap
|
|
maxBlockNum uint64
|
|
maxTxNum uint64
|
|
}
|
|
|
|
// Generates an efficient data structure for checking records during block store traversal
|
|
// and for storing output file writers
|
|
// Returns generated compKeyMap and highest record height in a compKeyMapWrapper
|
|
func generateRecordsMap(records []*models.DiffRecord, outputDirPath string) (ckmw *compKeyMapWrapper, err error) {
|
|
// Reorganize records as hashmap for faster lookups
|
|
inputKeyMap := make(compKeyMap)
|
|
maxBlock := uint64(0)
|
|
maxTx := uint64(0)
|
|
for i, r := range records {
|
|
// Confirm all records have at least namespace and key
|
|
if r.Namespace == "" || r.Key == "" {
|
|
return nil, errors.Errorf("invalid input json. Each record entry must contain both " +
|
|
"a \"namespace\" and a \"key\" field. Aborting identifytxs")
|
|
}
|
|
// Check for hashed data
|
|
var ns, coll string
|
|
if r.Hashed {
|
|
ns, coll, err = decodeHashedNs(r.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
ns = r.Namespace
|
|
}
|
|
// Record to compKey
|
|
ck := compKey{namespace: ns, collection: coll, key: r.Key}
|
|
// Check for duplicate entry
|
|
_, exists := inputKeyMap[ck]
|
|
if exists {
|
|
return nil, errors.Errorf("invalid input json. Contains duplicate record for "+
|
|
" {\"namespace\":\"%s\",\"key\":\"%s\"}. Aborting identifytxs", r.Namespace, r.Key)
|
|
}
|
|
// New entry, add entry and height
|
|
blockNum, txNum := r.GetLaterHeight()
|
|
// Only namespace and key were provided, record doesn't contain height. Set to "infinite" height to iterate entire block store.
|
|
if blockNum == 0 && txNum == 0 {
|
|
blockNum = math.MaxUint64
|
|
txNum = math.MaxUint64
|
|
}
|
|
// Check for max height
|
|
if blockNum > maxBlock || (blockNum == maxBlock && txNum > maxTx) {
|
|
maxBlock = blockNum
|
|
maxTx = txNum
|
|
}
|
|
// Create output file writer
|
|
filename := fmt.Sprintf("txlist%d.json", i+1)
|
|
ofw, err := jsonrw.NewJSONFileWriter(filepath.Join(outputDirPath, filename))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Write namespace and key to output and start list
|
|
err = ofw.OpenObject()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = ofw.AddField("namespace", ck.namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = ofw.AddField("key", ck.key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var emptySlice []interface{}
|
|
err = ofw.AddField("txs", emptySlice)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
inputKeyMap[ck] = &searchLimitAndJSONWriter{
|
|
searchBlockLimit: blockNum,
|
|
searchTxLimit: txNum,
|
|
writer: ofw,
|
|
}
|
|
}
|
|
|
|
return &compKeyMapWrapper{
|
|
compKeyMap: inputKeyMap,
|
|
maxBlockNum: maxBlock,
|
|
maxTxNum: maxTx,
|
|
}, nil
|
|
}
|
|
|
|
// Get a default block store provider to access the peer's block store
|
|
func getBlockStoreProvider(fsPath string) (*blkstorage.BlockStoreProvider, error) {
|
|
// Format path to block store
|
|
blockStorePath := kvledger.BlockStorePath(filepath.Join(fsPath, ledgersDataDirName))
|
|
isEmpty, err := fileutil.DirEmpty(blockStorePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if isEmpty {
|
|
return nil, errors.Errorf("provided path %s is empty. Aborting identifytxs", fsPath)
|
|
}
|
|
// Default fields for block store provider
|
|
conf := blkstorage.NewConf(blockStorePath, 0)
|
|
indexConfig := &blkstorage.IndexConfig{
|
|
AttrsToIndex: []blkstorage.IndexableAttr{
|
|
blkstorage.IndexableAttrBlockNum,
|
|
blkstorage.IndexableAttrBlockHash,
|
|
blkstorage.IndexableAttrTxID,
|
|
blkstorage.IndexableAttrBlockNumTranNum,
|
|
},
|
|
}
|
|
metricsProvider := &disabled.Provider{}
|
|
// Create new block store provider
|
|
blockStoreProvider, err := blkstorage.NewProvider(conf, indexConfig, metricsProvider)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return blockStoreProvider, nil
|
|
}
|
|
|
|
// txEntry represents and encapsulates a relevant transaction identified on the block store
|
|
type txEntry struct {
|
|
TxID string `json:"txid"`
|
|
BlockNum uint64 `json:"blockNum"`
|
|
TxNum uint64 `json:"txNum"`
|
|
TxVStatus string `json:"txValidationStatus"`
|
|
KeyWrite string `json:"keyWrite"`
|
|
}
|
|
|
|
// Builds the transaction sets for each record by identifying and aggregating block store transactions containing the relevant records
|
|
func findAndWriteTxs(blockStore *blkstorage.BlockStore, inputKeyMapWrapper *compKeyMapWrapper) (uint64, uint64, error) {
|
|
// Preparation for traversing block store
|
|
blockchainInfo, err := blockStore.GetBlockchainInfo()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// Get available block range for iterator
|
|
minBlockHeight := uint64(1)
|
|
// Check for first available block if block store is from bootstrapped peer
|
|
snapshotInfo := blockchainInfo.GetBootstrappingSnapshotInfo()
|
|
if snapshotInfo != nil {
|
|
minBlockHeight = snapshotInfo.GetLastBlockInSnapshot() + uint64(1)
|
|
}
|
|
|
|
maxAvailable := blockchainInfo.GetHeight() - 1
|
|
maxBlockHeight := maxAvailable
|
|
maxTxHeight := uint64(math.MaxUint64)
|
|
// Check if compKeyMap's highest record is earlier than block store's max block height
|
|
if inputKeyMapWrapper.maxBlockNum <= maxBlockHeight {
|
|
maxBlockHeight = inputKeyMapWrapper.maxBlockNum
|
|
maxTxHeight = inputKeyMapWrapper.maxTxNum
|
|
}
|
|
blocksItr, err := blockStore.RetrieveBlocks(minBlockHeight)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer blocksItr.Close()
|
|
|
|
inputKeyMap := inputKeyMapWrapper.compKeyMap
|
|
// Set block index to minimum allowed block height
|
|
blockIndex := minBlockHeight
|
|
txIndex := uint64(0)
|
|
// Iterate through all completed blocks of block store
|
|
for blockIndex <= maxBlockHeight {
|
|
// Unpack next block
|
|
nextBlock, err := blocksItr.Next()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
block := nextBlock.(*common.Block)
|
|
blockData := block.GetData().GetData()
|
|
|
|
// Iterate through block transactions
|
|
for txIndexInt, envBytes := range blockData {
|
|
txIndex = uint64(txIndexInt)
|
|
// Extract next transaction
|
|
env, err := protoutil.GetEnvelopeFromBlock(envBytes)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
pl, err := protoutil.UnmarshalPayload(env.GetPayload())
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
ch, err := protoutil.UnmarshalChannelHeader(pl.Header.ChannelHeader)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// Check that transaction is endorser transaction, otherwise skip
|
|
txType := common.HeaderType(ch.Type)
|
|
if txType == common.HeaderType_ENDORSER_TRANSACTION {
|
|
txID := ch.GetTxId()
|
|
// Extract write set from transaction then iterate through transaction write set
|
|
res, err := protoutil.GetActionFromEnvelope(envBytes)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// RW set
|
|
txRWSet := &rwsetutil.TxRwSet{}
|
|
err = txRWSet.FromProtoBytes(res.GetResults())
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// Iterate namespaces in read write set
|
|
nsRWSets := txRWSet.NsRwSets
|
|
for _, nsRWSet := range nsRWSets {
|
|
namespace := nsRWSet.NameSpace
|
|
// Iterate public keys
|
|
writes := nsRWSet.KvRwSet.GetWrites()
|
|
for _, write := range writes {
|
|
key := write.GetKey()
|
|
ck := compKey{namespace: namespace, key: key}
|
|
_, exists := inputKeyMap[ck]
|
|
if exists {
|
|
// Get txValidationCode
|
|
txvCode, _, err := blockStore.RetrieveTxValidationCodeByTxID(txID)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// Create new transaction entry
|
|
entry := txEntry{
|
|
TxID: txID,
|
|
BlockNum: blockIndex,
|
|
TxNum: txIndex,
|
|
TxVStatus: txvCode.String(),
|
|
KeyWrite: string(write.GetValue()),
|
|
}
|
|
// Capture new transaction entry
|
|
ckMapEmpty, err := captureTx(inputKeyMap, ck, entry)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// No more keys left
|
|
if ckMapEmpty {
|
|
return minBlockHeight, blockIndex, nil
|
|
}
|
|
}
|
|
}
|
|
// Iterate collections
|
|
collections := nsRWSet.CollHashedRwSets
|
|
for _, collection := range collections {
|
|
// Iterate private key hashes
|
|
hashedWrites := collection.HashedRwSet.GetHashedWrites()
|
|
for _, hashedWrite := range hashedWrites {
|
|
keyHash := hashedWrite.GetKeyHash()
|
|
// Get hexadecimal encoding of key hash
|
|
ck := compKey{namespace: namespace, collection: (collection.CollectionName), key: hex.EncodeToString(keyHash)}
|
|
_, exists := inputKeyMap[ck]
|
|
if exists {
|
|
// Get txValidationCode
|
|
txvCode, _, err := blockStore.RetrieveTxValidationCodeByTxID(txID)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// Create new transaction entry
|
|
entry := txEntry{
|
|
TxID: txID,
|
|
BlockNum: blockIndex,
|
|
TxNum: txIndex,
|
|
TxVStatus: txvCode.String(),
|
|
KeyWrite: hex.EncodeToString(hashedWrite.GetValueHash()),
|
|
}
|
|
// Capture new transaction entry
|
|
ckMapEmpty, err := captureTx(inputKeyMap, ck, entry)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
// No more keys left
|
|
if ckMapEmpty {
|
|
return minBlockHeight, blockIndex, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Check if highest record in inputKeyMap has been reached
|
|
if blockIndex == maxBlockHeight && txIndex == maxTxHeight {
|
|
err = inputKeyMap.closeAll(blockIndex, txIndex, maxAvailable)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return minBlockHeight, maxBlockHeight, nil
|
|
}
|
|
}
|
|
}
|
|
blockIndex++
|
|
}
|
|
// Close out any remaining open output file writers
|
|
err = inputKeyMap.closeAll((blockIndex - 1), txIndex, maxAvailable)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return minBlockHeight, maxBlockHeight, nil
|
|
}
|
|
|
|
// Adds a transaction entry to a key's JSONFileWriter then removes key if transaction is at its record height
|
|
// Returns true if compKeyMap is empty and false if compKeyMap still has keys to search for
|
|
func captureTx(inputKeyMap compKeyMap, k compKey, e txEntry) (bool, error) {
|
|
// Get JSONFileWriter
|
|
v := inputKeyMap[k]
|
|
// Add new transaction entry
|
|
err := v.writer.AddEntry(e)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
// Check if record height has been reached and remove key if so
|
|
if e.BlockNum == v.searchBlockLimit && e.TxNum == v.searchTxLimit {
|
|
err = inputKeyMap.close(v)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
delete(inputKeyMap, k)
|
|
// Check if all keys have reached their height limit
|
|
if len(inputKeyMap) == 0 {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// Exctracts namespace from snapshot namespace concatenation
|
|
func decodeHashedNs(hashedDataNs string) (string, string, error) {
|
|
strs := strings.Split(hashedDataNs, nsJoiner+hashDataPrefix)
|
|
if len(strs) != 2 {
|
|
return "", "", errors.Errorf("not a valid hashedDataNs [%s]", hashedDataNs)
|
|
}
|
|
return strs[0], strs[1], nil
|
|
}
|