go_study/fabric-main/common/ledger/blkstorage/blocks_itr.go

97 lines
2.8 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blkstorage
import (
"sync"
"github.com/hyperledger/fabric/common/ledger"
)
// blocksItr - an iterator for iterating over a sequence of blocks
type blocksItr struct {
mgr *blockfileMgr
maxBlockNumAvailable uint64
blockNumToRetrieve uint64
stream *blockStream
closeMarker bool
closeMarkerLock *sync.Mutex
}
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
mgr.blkfilesInfoCond.L.Lock()
defer mgr.blkfilesInfoCond.L.Unlock()
return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}}
}
func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {
itr.mgr.blkfilesInfoCond.L.Lock()
defer itr.mgr.blkfilesInfoCond.L.Unlock()
for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() {
logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]",
itr.mgr.blockfilesInfo.lastPersistedBlock, blockNum)
itr.mgr.blkfilesInfoCond.Wait()
logger.Debugf("Came out of wait. maxAvailaBlockNumber=[%d]", itr.mgr.blockfilesInfo.lastPersistedBlock)
}
return itr.mgr.blockfilesInfo.lastPersistedBlock
}
func (itr *blocksItr) initStream() error {
var lp *fileLocPointer
var err error
if lp, err = itr.mgr.index.getBlockLocByBlockNum(itr.blockNumToRetrieve); err != nil {
return err
}
if itr.stream, err = newBlockStream(itr.mgr.rootDir, lp.fileSuffixNum, int64(lp.offset), -1); err != nil {
return err
}
return nil
}
func (itr *blocksItr) shouldClose() bool {
itr.closeMarkerLock.Lock()
defer itr.closeMarkerLock.Unlock()
return itr.closeMarker
}
// Next moves the cursor to next block and returns true iff the iterator is not exhausted
func (itr *blocksItr) Next() (ledger.QueryResult, error) {
if itr.maxBlockNumAvailable < itr.blockNumToRetrieve {
itr.maxBlockNumAvailable = itr.waitForBlock(itr.blockNumToRetrieve)
}
itr.closeMarkerLock.Lock()
defer itr.closeMarkerLock.Unlock()
if itr.closeMarker {
return nil, nil
}
if itr.stream == nil {
logger.Debugf("Initializing block stream for iterator. itr.maxBlockNumAvailable=%d", itr.maxBlockNumAvailable)
if err := itr.initStream(); err != nil {
return nil, err
}
}
nextBlockBytes, err := itr.stream.nextBlockBytes()
if err != nil {
return nil, err
}
itr.blockNumToRetrieve++
return deserializeBlock(nextBlockBytes)
}
// Close releases any resources held by the iterator
func (itr *blocksItr) Close() {
itr.mgr.blkfilesInfoCond.L.Lock()
defer itr.mgr.blkfilesInfoCond.L.Unlock()
itr.closeMarkerLock.Lock()
defer itr.closeMarkerLock.Unlock()
itr.closeMarker = true
itr.mgr.blkfilesInfoCond.Broadcast()
if itr.stream != nil {
itr.stream.close()
}
}