go_study/fabric-main/common/ledger/blkstorage/blockfile_mgr_test.go

515 lines
19 KiB
Go

/*
Copyright IBM Corp. 2016 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blkstorage
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/internal/pkg/txflags"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)
func TestBlockfileMgrBlockReadWrite(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.testGetBlockByHash(blocks)
blkfileMgrWrapper.testGetBlockByNumber(blocks)
}
func TestAddBlockWithWrongHash(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks[0:9])
lastBlock := blocks[9]
lastBlock.Header.PreviousHash = []byte("someJunkHash") // set the hash to something unexpected
err := blkfileMgrWrapper.blockfileMgr.addBlock(lastBlock)
require.Error(t, err, "An error is expected when adding a block with some unexpected hash")
require.Contains(t, err.Error(), "unexpected Previous block hash. Expected PreviousHash")
t.Logf("err = %s", err)
}
func TestBlockfileMgrCrashDuringWriting(t *testing.T) {
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 10, false)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 1, false)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 0, false)
testBlockfileMgrCrashDuringWriting(t, 0, 0, 1000, 10, false)
testBlockfileMgrCrashDuringWriting(t, 0, 5, 1000, 10, false)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 10, true)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 1, true)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 0, true)
testBlockfileMgrCrashDuringWriting(t, 0, 0, 1000, 10, true)
testBlockfileMgrCrashDuringWriting(t, 0, 5, 1000, 10, true)
}
func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlksBeforeSavingBlkfilesInfo int,
numBlksAfterSavingBlkfilesInfo int, numLastBlockBytes int, numPartialBytesToWrite int,
deleteBFInfo bool) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg, gb := testutil.NewBlockGenerator(t, ledgerid, false)
// create all necessary blocks
totalBlocks := numBlksBeforeSavingBlkfilesInfo + numBlksAfterSavingBlkfilesInfo
allBlocks := []*common.Block{gb}
allBlocks = append(allBlocks, bg.NextTestBlocks(totalBlocks+1)...)
// identify the blocks that are to be added beforeCP, afterCP, and after restart
blocksBeforeSavingBlkfilesInfo := []*common.Block{}
blocksAfterSavingBlkfilesInfo := []*common.Block{}
if numBlksBeforeSavingBlkfilesInfo != 0 {
blocksBeforeSavingBlkfilesInfo = allBlocks[0:numBlksBeforeSavingBlkfilesInfo]
}
if numBlksAfterSavingBlkfilesInfo != 0 {
blocksAfterSavingBlkfilesInfo = allBlocks[numBlksBeforeSavingBlkfilesInfo : numBlksBeforeSavingBlkfilesInfo+numBlksAfterSavingBlkfilesInfo]
}
blocksAfterRestart := allBlocks[numBlksBeforeSavingBlkfilesInfo+numBlksAfterSavingBlkfilesInfo:]
// add blocks before cp
blkfileMgrWrapper.addBlocks(blocksBeforeSavingBlkfilesInfo)
currentBlkfilesInfo := blkfileMgrWrapper.blockfileMgr.blockfilesInfo
blkfilesInfo1 := &blockfilesInfo{
latestFileNumber: currentBlkfilesInfo.latestFileNumber,
latestFileSize: currentBlkfilesInfo.latestFileSize,
noBlockFiles: currentBlkfilesInfo.noBlockFiles,
lastPersistedBlock: currentBlkfilesInfo.lastPersistedBlock,
}
// add blocks after cp
blkfileMgrWrapper.addBlocks(blocksAfterSavingBlkfilesInfo)
blkfilesInfo2 := blkfileMgrWrapper.blockfileMgr.blockfilesInfo
// simulate a crash scenario
lastBlockBytes := []byte{}
encodedLen := proto.EncodeVarint(uint64(numLastBlockBytes))
randomBytes := testutil.ConstructRandomBytes(t, numLastBlockBytes)
lastBlockBytes = append(lastBlockBytes, encodedLen...)
lastBlockBytes = append(lastBlockBytes, randomBytes...)
partialBytes := lastBlockBytes[:numPartialBytesToWrite]
blkfileMgrWrapper.blockfileMgr.currentFileWriter.append(partialBytes, true)
if deleteBFInfo {
err := blkfileMgrWrapper.blockfileMgr.db.Delete(blkMgrInfoKey, true)
require.NoError(t, err)
} else {
blkfileMgrWrapper.blockfileMgr.saveBlkfilesInfo(blkfilesInfo1, true)
}
blkfileMgrWrapper.close()
// simulate a start after a crash
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfilesInfo3 := blkfileMgrWrapper.blockfileMgr.blockfilesInfo
require.Equal(t, blkfilesInfo2, blkfilesInfo3)
// add fresh blocks after restart
blkfileMgrWrapper.addBlocks(blocksAfterRestart)
testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 0, len(allBlocks)-1, allBlocks)
}
func TestBlockfileMgrBlockIterator(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 0, 7, blocks[0:8])
}
func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
firstBlockNum int, lastBlockNum int, expectedBlocks []*common.Block) {
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum))
require.NoError(t, err, "Error while getting blocks iterator")
defer itr.Close()
numBlocksItrated := 0
for {
block, err := itr.Next()
require.NoError(t, err, "Error while getting block number [%d] from iterator", numBlocksItrated)
require.Equal(t, expectedBlocks[numBlocksItrated], block)
numBlocksItrated++
if numBlocksItrated == lastBlockNum-firstBlockNum+1 {
break
}
}
require.Equal(t, lastBlockNum-firstBlockNum+1, numBlocksItrated)
}
func TestBlockfileMgrBlockchainInfo(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
bcInfo := blkfileMgrWrapper.blockfileMgr.getBlockchainInfo()
require.Equal(t, &common.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}, bcInfo)
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
bcInfo = blkfileMgrWrapper.blockfileMgr.getBlockchainInfo()
require.Equal(t, uint64(10), bcInfo.Height)
}
func TestTxIDExists(t *testing.T) {
t.Run("green-path", func(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkStore, err := env.provider.Open("testLedger")
require.NoError(t, err)
defer blkStore.Shutdown()
blocks := testutil.ConstructTestBlocks(t, 2)
for _, blk := range blocks {
require.NoError(t, blkStore.AddBlock(blk))
}
for _, blk := range blocks {
for i := range blk.Data.Data {
txID, err := protoutil.GetOrComputeTxIDFromEnvelope(blk.Data.Data[i])
require.NoError(t, err)
exists, err := blkStore.TxIDExists(txID)
require.NoError(t, err)
require.True(t, exists)
}
}
exists, err := blkStore.TxIDExists("non-existent-txid")
require.NoError(t, err)
require.False(t, exists)
})
t.Run("error-path", func(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkStore, err := env.provider.Open("testLedger")
require.NoError(t, err)
defer blkStore.Shutdown()
env.provider.Close()
exists, err := blkStore.TxIDExists("random")
require.EqualError(t, err, "error while trying to check the presence of TXID [random]: internal leveldb error while obtaining db iterator: leveldb: closed")
require.False(t, exists)
})
}
func TestBlockfileMgrGetTxById(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 2)
blkfileMgrWrapper.addBlocks(blocks)
for _, blk := range blocks {
for j, txEnvelopeBytes := range blk.Data.Data {
// blockNum starts with 0
txID, err := protoutil.GetOrComputeTxIDFromEnvelope(blk.Data.Data[j])
require.NoError(t, err)
txEnvelopeFromFileMgr, err := blkfileMgrWrapper.blockfileMgr.retrieveTransactionByID(txID)
require.NoError(t, err, "Error while retrieving tx from blkfileMgr")
txEnvelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopeBytes)
require.NoError(t, err, "Error while unmarshalling tx")
require.Equal(t, txEnvelope, txEnvelopeFromFileMgr)
}
}
}
// TestBlockfileMgrGetTxByIdDuplicateTxid tests that a transaction with an existing txid
// (within same block or a different block) should not over-write the index by-txid (FAB-8557)
func TestBlockfileMgrGetTxByIdDuplicateTxid(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkStore, err := env.provider.Open("testLedger")
require.NoError(env.t, err)
blkFileMgr := blkStore.fileMgr
bg, gb := testutil.NewBlockGenerator(t, "testLedger", false)
require.NoError(t, blkFileMgr.addBlock(gb))
block1 := bg.NextBlockWithTxid(
[][]byte{
[]byte("tx with id=txid-1"),
[]byte("tx with id=txid-2"),
[]byte("another tx with existing id=txid-1"),
},
[]string{"txid-1", "txid-2", "txid-1"},
)
txValidationFlags := txflags.New(3)
txValidationFlags.SetFlag(0, peer.TxValidationCode_VALID)
txValidationFlags.SetFlag(1, peer.TxValidationCode_INVALID_OTHER_REASON)
txValidationFlags.SetFlag(2, peer.TxValidationCode_DUPLICATE_TXID)
block1.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txValidationFlags
require.NoError(t, blkFileMgr.addBlock(block1))
block2 := bg.NextBlockWithTxid(
[][]byte{
[]byte("tx with id=txid-3"),
[]byte("yet another tx with existing id=txid-1"),
},
[]string{"txid-3", "txid-1"},
)
txValidationFlags = txflags.New(2)
txValidationFlags.SetFlag(0, peer.TxValidationCode_VALID)
txValidationFlags.SetFlag(1, peer.TxValidationCode_DUPLICATE_TXID)
block2.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txValidationFlags
require.NoError(t, blkFileMgr.addBlock(block2))
txenvp1, err := protoutil.GetEnvelopeFromBlock(block1.Data.Data[0])
require.NoError(t, err)
txenvp2, err := protoutil.GetEnvelopeFromBlock(block1.Data.Data[1])
require.NoError(t, err)
txenvp3, err := protoutil.GetEnvelopeFromBlock(block2.Data.Data[0])
require.NoError(t, err)
indexedTxenvp, _ := blkFileMgr.retrieveTransactionByID("txid-1")
require.Equal(t, txenvp1, indexedTxenvp)
indexedTxenvp, _ = blkFileMgr.retrieveTransactionByID("txid-2")
require.Equal(t, txenvp2, indexedTxenvp)
indexedTxenvp, _ = blkFileMgr.retrieveTransactionByID("txid-3")
require.Equal(t, txenvp3, indexedTxenvp)
blk, _ := blkFileMgr.retrieveBlockByTxID("txid-1")
require.Equal(t, block1, blk)
blk, _ = blkFileMgr.retrieveBlockByTxID("txid-2")
require.Equal(t, block1, blk)
blk, _ = blkFileMgr.retrieveBlockByTxID("txid-3")
require.Equal(t, block2, blk)
validationCode, blkNum, _ := blkFileMgr.retrieveTxValidationCodeByTxID("txid-1")
require.Equal(t, peer.TxValidationCode_VALID, validationCode)
require.Equal(t, uint64(1), blkNum)
validationCode, blkNum, _ = blkFileMgr.retrieveTxValidationCodeByTxID("txid-2")
require.Equal(t, peer.TxValidationCode_INVALID_OTHER_REASON, validationCode)
require.Equal(t, uint64(1), blkNum)
validationCode, blkNum, _ = blkFileMgr.retrieveTxValidationCodeByTxID("txid-3")
require.Equal(t, peer.TxValidationCode_VALID, validationCode)
require.Equal(t, uint64(2), blkNum)
// though we do not expose an API for retrieving all the txs by same id but we may in future
// and the data is persisted to support this. below code tests this behavior internally
w := &testBlockfileMgrWrapper{
t: t,
blockfileMgr: blkFileMgr,
}
w.testGetMultipleDataByTxID(
"txid-1",
[]*expectedBlkTxValidationCode{
{
blk: block1,
txEnv: protoutil.ExtractEnvelopeOrPanic(block1, 0),
validationCode: peer.TxValidationCode_VALID,
},
{
blk: block1,
txEnv: protoutil.ExtractEnvelopeOrPanic(block1, 2),
validationCode: peer.TxValidationCode_DUPLICATE_TXID,
},
{
blk: block2,
txEnv: protoutil.ExtractEnvelopeOrPanic(block2, 1),
validationCode: peer.TxValidationCode_DUPLICATE_TXID,
},
},
)
w.testGetMultipleDataByTxID(
"txid-2",
[]*expectedBlkTxValidationCode{
{
blk: block1,
txEnv: protoutil.ExtractEnvelopeOrPanic(block1, 1),
validationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
},
},
)
w.testGetMultipleDataByTxID(
"txid-3",
[]*expectedBlkTxValidationCode{
{
blk: block2,
txEnv: protoutil.ExtractEnvelopeOrPanic(block2, 0),
validationCode: peer.TxValidationCode_VALID,
},
},
)
}
func TestBlockfileMgrGetTxByBlockNumTranNum(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
for blockIndex, blk := range blocks {
for tranIndex, txEnvelopeBytes := range blk.Data.Data {
// blockNum and tranNum both start with 0
txEnvelopeFromFileMgr, err := blkfileMgrWrapper.blockfileMgr.retrieveTransactionByBlockNumTranNum(uint64(blockIndex), uint64(tranIndex))
require.NoError(t, err, "Error while retrieving tx from blkfileMgr")
txEnvelope, err := protoutil.GetEnvelopeFromBlock(txEnvelopeBytes)
require.NoError(t, err, "Error while unmarshalling tx")
require.Equal(t, txEnvelope, txEnvelopeFromFileMgr)
}
}
}
func TestBlockfileMgrRestart(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
expectedHeight := uint64(10)
require.Equal(t, expectedHeight, blkfileMgrWrapper.blockfileMgr.getBlockchainInfo().Height)
blkfileMgrWrapper.close()
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
require.Equal(t, 9, int(blkfileMgrWrapper.blockfileMgr.blockfilesInfo.lastPersistedBlock))
blkfileMgrWrapper.testGetBlockByHash(blocks)
require.Equal(t, expectedHeight, blkfileMgrWrapper.blockfileMgr.getBlockchainInfo().Height)
}
func TestBlockfileMgrFileRolling(t *testing.T) {
blocks := testutil.ConstructTestBlocks(t, 200)
size := 0
for _, block := range blocks[:100] {
by, _, err := serializeBlock(block)
require.NoError(t, err, "Error while serializing block")
blockBytesSize := len(by)
encodedLen := proto.EncodeVarint(uint64(blockBytesSize))
size += blockBytesSize + len(encodedLen)
}
maxFileSie := int(0.75 * float64(size))
env := newTestEnv(t, NewConf(t.TempDir(), maxFileSie))
defer env.Cleanup()
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
blkfileMgrWrapper.addBlocks(blocks[:100])
require.Equal(t, 1, blkfileMgrWrapper.blockfileMgr.blockfilesInfo.latestFileNumber)
blkfileMgrWrapper.testGetBlockByHash(blocks[:100])
blkfileMgrWrapper.close()
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfileMgrWrapper.addBlocks(blocks[100:])
require.Equal(t, 2, blkfileMgrWrapper.blockfileMgr.blockfilesInfo.latestFileNumber)
blkfileMgrWrapper.testGetBlockByHash(blocks[100:])
}
func TestBlockfileMgrGetBlockByTxID(t *testing.T) {
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
for _, blk := range blocks {
for j := range blk.Data.Data {
// blockNum starts with 1
txID, err := protoutil.GetOrComputeTxIDFromEnvelope(blk.Data.Data[j])
require.NoError(t, err)
blockFromFileMgr, err := blkfileMgrWrapper.blockfileMgr.retrieveBlockByTxID(txID)
require.NoError(t, err, "Error while retrieving block from blkfileMgr")
require.Equal(t, blk, blockFromFileMgr)
}
}
}
func TestBlockfileMgrSimulateCrashAtFirstBlockInFile(t *testing.T) {
t.Run("blockfilesInfo persisted", func(t *testing.T) {
testBlockfileMgrSimulateCrashAtFirstBlockInFile(t, false)
})
t.Run("blockfilesInfo to be computed from block files", func(t *testing.T) {
testBlockfileMgrSimulateCrashAtFirstBlockInFile(t, true)
})
}
func testBlockfileMgrSimulateCrashAtFirstBlockInFile(t *testing.T, deleteBlkfilesInfo bool) {
// open blockfileMgr and add 5 blocks
env := newTestEnv(t, NewConf(t.TempDir(), 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
blockfileMgr := blkfileMgrWrapper.blockfileMgr
blocks := testutil.ConstructTestBlocks(t, 10)
for i := 0; i < 10; i++ {
fmt.Printf("blocks[i].Header.Number = %d\n", blocks[i].Header.Number)
}
blkfileMgrWrapper.addBlocks(blocks[:5])
firstFilePath := blockfileMgr.currentFileWriter.filePath
firstBlkFileSize := testutilGetFileSize(t, firstFilePath)
// move to next file and simulate crash scenario while writing the first block
blockfileMgr.moveToNextFile()
partialBytesForNextBlock := append(
proto.EncodeVarint(uint64(10000)),
[]byte("partialBytesForNextBlock depicting a crash during first block in file")...,
)
blockfileMgr.currentFileWriter.append(partialBytesForNextBlock, true)
if deleteBlkfilesInfo {
err := blockfileMgr.db.Delete(blkMgrInfoKey, true)
require.NoError(t, err)
}
blkfileMgrWrapper.close()
// verify that the block file number 1 has been created with partial bytes as a side-effect of crash
lastFilePath := blockfileMgr.currentFileWriter.filePath
lastFileContent, err := ioutil.ReadFile(lastFilePath)
require.NoError(t, err)
require.Equal(t, lastFileContent, partialBytesForNextBlock)
// simulate reopen after crash
blkfileMgrWrapper = newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
// last block file (block file number 1) should have been truncated to zero length and concluded as the next file to append to
require.Equal(t, 0, testutilGetFileSize(t, lastFilePath))
require.Equal(t,
&blockfilesInfo{
latestFileNumber: 1,
latestFileSize: 0,
lastPersistedBlock: 4,
noBlockFiles: false,
},
blkfileMgrWrapper.blockfileMgr.blockfilesInfo,
)
// Add 5 more blocks and assert that they are added to last file (block file number 1) and full scanning across two files works as expected
blkfileMgrWrapper.addBlocks(blocks[5:])
require.True(t, testutilGetFileSize(t, lastFilePath) > 0)
require.Equal(t, firstBlkFileSize, testutilGetFileSize(t, firstFilePath))
blkfileMgrWrapper.testGetBlockByNumber(blocks)
testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 0, len(blocks)-1, blocks)
}
func testutilGetFileSize(t *testing.T, path string) int {
fi, err := os.Stat(path)
require.NoError(t, err)
return int(fi.Size())
}