816 lines
28 KiB
Go
816 lines
28 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package state
|
|
|
|
import (
|
|
"bytes"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
pb "github.com/golang/protobuf/proto"
|
|
"github.com/hyperledger/fabric-protos-go/common"
|
|
proto "github.com/hyperledger/fabric-protos-go/gossip"
|
|
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
|
|
"github.com/hyperledger/fabric-protos-go/peer"
|
|
"github.com/hyperledger/fabric-protos-go/transientstore"
|
|
vsccErrors "github.com/hyperledger/fabric/common/errors"
|
|
"github.com/hyperledger/fabric/gossip/api"
|
|
"github.com/hyperledger/fabric/gossip/comm"
|
|
common2 "github.com/hyperledger/fabric/gossip/common"
|
|
"github.com/hyperledger/fabric/gossip/discovery"
|
|
"github.com/hyperledger/fabric/gossip/metrics"
|
|
"github.com/hyperledger/fabric/gossip/protoext"
|
|
"github.com/hyperledger/fabric/gossip/util"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// GossipStateProvider is the interface to acquire sequences of the ledger blocks
|
|
// capable to full fill missing blocks by running state replication and
|
|
// sending request to get missing block to other nodes
|
|
type GossipStateProvider interface {
|
|
AddPayload(payload *proto.Payload) error
|
|
|
|
// Stop terminates state transfer object
|
|
Stop()
|
|
}
|
|
|
|
const (
|
|
stragglerWarningThreshold = 100
|
|
defAntiEntropyBatchSize = 10
|
|
defMaxBlockDistance = 20
|
|
|
|
blocking = true
|
|
nonBlocking = false
|
|
|
|
enqueueRetryInterval = time.Millisecond * 100
|
|
)
|
|
|
|
// Configuration keeps state transfer configuration parameters
|
|
type Configuration struct {
|
|
AntiEntropyInterval time.Duration
|
|
AntiEntropyStateResponseTimeout time.Duration
|
|
AntiEntropyBatchSize uint64
|
|
MaxBlockDistance int
|
|
AntiEntropyMaxRetries int
|
|
ChannelBufferSize int
|
|
EnableStateTransfer bool
|
|
}
|
|
|
|
// GossipAdapter defines gossip/communication required interface for state provider
|
|
type GossipAdapter interface {
|
|
// Send sends a message to remote peers
|
|
Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)
|
|
|
|
// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
|
|
// If passThrough is false, the messages are processed by the gossip layer beforehand.
|
|
// If passThrough is true, the gossip layer doesn't intervene and the messages
|
|
// can be used to send a reply back to the sender
|
|
Accept(acceptor common2.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan protoext.ReceivedMessage)
|
|
|
|
// UpdateLedgerHeight updates the ledger height the peer
|
|
// publishes to other peers in the channel
|
|
UpdateLedgerHeight(height uint64, channelID common2.ChannelID)
|
|
|
|
// PeersOfChannel returns the NetworkMembers considered alive
|
|
// and also subscribed to the channel given
|
|
PeersOfChannel(common2.ChannelID) []discovery.NetworkMember
|
|
}
|
|
|
|
// MCSAdapter adapter of message crypto service interface to bound
|
|
// specific APIs required by state transfer service
|
|
type MCSAdapter interface {
|
|
// VerifyBlock returns nil if the block is properly signed, and the claimed seqNum is the
|
|
// sequence number that the block's header contains.
|
|
// else returns error
|
|
VerifyBlock(channelID common2.ChannelID, seqNum uint64, signedBlock *common.Block) error
|
|
|
|
// VerifyByChannel checks that signature is a valid signature of message
|
|
// under a peer's verification key, but also in the context of a specific channel.
|
|
// If the verification succeeded, Verify returns nil meaning no error occurred.
|
|
// If peerIdentity is nil, then the verification fails.
|
|
VerifyByChannel(channelID common2.ChannelID, peerIdentity api.PeerIdentityType, signature, message []byte) error
|
|
}
|
|
|
|
// ledgerResources defines abilities that the ledger provides
|
|
type ledgerResources interface {
|
|
// StoreBlock deliver new block with underlined private data
|
|
// returns missing transaction ids
|
|
StoreBlock(block *common.Block, data util.PvtDataCollections) error
|
|
|
|
// StorePvtData used to persist private data into transient store
|
|
StorePvtData(txid string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blckHeight uint64) error
|
|
|
|
// GetPvtDataAndBlockByNum gets block by number and also returns all related private data
|
|
// that requesting peer is eligible for.
|
|
// The order of private data in slice of PvtDataCollections doesn't imply the order of
|
|
// transactions in the block related to these private data, to get the correct placement
|
|
// need to read TxPvtData.SeqInBlock field
|
|
GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo protoutil.SignedData) (*common.Block, util.PvtDataCollections, error)
|
|
|
|
// Get recent block sequence number
|
|
LedgerHeight() (uint64, error)
|
|
|
|
// Close ledgerResources
|
|
Close()
|
|
}
|
|
|
|
// ServicesMediator aggregated adapter to compound all mediator
|
|
// required by state transfer into single struct
|
|
type ServicesMediator struct {
|
|
GossipAdapter
|
|
MCSAdapter
|
|
}
|
|
|
|
// GossipStateProviderImpl the implementation of the GossipStateProvider interface
|
|
// the struct to handle in memory sliding window of
|
|
// new ledger block to be acquired by hyper ledger
|
|
type GossipStateProviderImpl struct {
|
|
logger util.Logger
|
|
|
|
// Chain id
|
|
chainID string
|
|
|
|
mediator *ServicesMediator
|
|
|
|
// Queue of payloads which wasn't acquired yet
|
|
payloads PayloadsBuffer
|
|
|
|
ledger ledgerResources
|
|
|
|
stateResponseCh chan protoext.ReceivedMessage
|
|
|
|
stateRequestCh chan protoext.ReceivedMessage
|
|
|
|
stopCh chan struct{}
|
|
|
|
once sync.Once
|
|
|
|
stateTransferActive int32
|
|
|
|
stateMetrics *metrics.StateMetrics
|
|
|
|
requestValidator *stateRequestValidator
|
|
|
|
blockingMode bool
|
|
|
|
config *StateConfig
|
|
}
|
|
|
|
// stateRequestValidator facilitates validation of the state request messages
|
|
type stateRequestValidator struct{}
|
|
|
|
// validate checks for RemoteStateRequest message validity
|
|
func (v *stateRequestValidator) validate(request *proto.RemoteStateRequest, batchSize uint64) error {
|
|
if request.StartSeqNum > request.EndSeqNum {
|
|
return errors.Errorf("Invalid sequence interval [%d...%d).", request.StartSeqNum, request.EndSeqNum)
|
|
}
|
|
|
|
if request.EndSeqNum > batchSize+request.StartSeqNum {
|
|
return errors.Errorf("Requesting blocks range [%d-%d) greater than configured allowed"+
|
|
" (%d) batching size for anti-entropy.", request.StartSeqNum, request.EndSeqNum, batchSize)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewGossipStateProvider creates state provider with coordinator instance
|
|
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
|
|
func NewGossipStateProvider(
|
|
logger util.Logger,
|
|
chainID string,
|
|
services *ServicesMediator,
|
|
ledger ledgerResources,
|
|
stateMetrics *metrics.StateMetrics,
|
|
blockingMode bool,
|
|
config *StateConfig,
|
|
) GossipStateProvider {
|
|
gossipChan, _ := services.Accept(func(message interface{}) bool {
|
|
// Get only data messages
|
|
return protoext.IsDataMsg(message.(*proto.GossipMessage)) &&
|
|
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
|
|
}, false)
|
|
|
|
remoteStateMsgFilter := func(message interface{}) bool {
|
|
receivedMsg := message.(protoext.ReceivedMessage)
|
|
msg := receivedMsg.GetGossipMessage()
|
|
if !(protoext.IsRemoteStateMessage(msg.GossipMessage) || msg.GetPrivateData() != nil) {
|
|
return false
|
|
}
|
|
// Ensure we deal only with messages that belong to this channel
|
|
if !bytes.Equal(msg.Channel, []byte(chainID)) {
|
|
return false
|
|
}
|
|
connInfo := receivedMsg.GetConnectionInfo()
|
|
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
|
|
if authErr != nil {
|
|
logger.Warning("Got unauthorized request from", string(connInfo.Identity))
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Filter message which are only relevant for nodeMetastate transfer
|
|
_, commChan := services.Accept(remoteStateMsgFilter, true)
|
|
|
|
height, err := ledger.LedgerHeight()
|
|
if height == 0 {
|
|
// Panic here since this is an indication of invalid situation which should not happen in normal
|
|
// code path.
|
|
logger.Panic("Committer height cannot be zero, ledger should include at least one block (genesis).")
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Error("Could not read ledger info to obtain current ledger height due to: ", errors.WithStack(err))
|
|
// Exiting as without ledger it will be impossible
|
|
// to deliver new blocks
|
|
return nil
|
|
}
|
|
|
|
s := &GossipStateProviderImpl{
|
|
logger: logger,
|
|
// MessageCryptoService
|
|
mediator: services,
|
|
// Chain ID
|
|
chainID: chainID,
|
|
// Create a queue for payloads, wrapped in a metrics buffer
|
|
payloads: &metricsBuffer{
|
|
PayloadsBuffer: NewPayloadsBuffer(height),
|
|
sizeMetrics: stateMetrics.PayloadBufferSize,
|
|
chainID: chainID,
|
|
},
|
|
ledger: ledger,
|
|
stateResponseCh: make(chan protoext.ReceivedMessage, config.StateChannelSize),
|
|
stateRequestCh: make(chan protoext.ReceivedMessage, config.StateChannelSize),
|
|
stopCh: make(chan struct{}),
|
|
stateTransferActive: 0,
|
|
once: sync.Once{},
|
|
stateMetrics: stateMetrics,
|
|
requestValidator: &stateRequestValidator{},
|
|
blockingMode: blockingMode,
|
|
config: config,
|
|
}
|
|
|
|
logger.Infof("Updating metadata information for channel %s, "+
|
|
"current ledger sequence is at = %d, next expected block is = %d", chainID, height-1, s.payloads.Next())
|
|
logger.Debug("Updating gossip ledger height to", height)
|
|
services.UpdateLedgerHeight(height, common2.ChannelID(s.chainID))
|
|
|
|
// Listen for incoming communication
|
|
go s.receiveAndQueueGossipMessages(gossipChan)
|
|
go s.receiveAndDispatchDirectMessages(commChan)
|
|
// Deliver in order messages into the incoming channel
|
|
go s.deliverPayloads()
|
|
if s.config.StateEnabled {
|
|
// Execute anti entropy to fill missing gaps
|
|
go s.antiEntropy()
|
|
}
|
|
// Taking care of state request messages
|
|
go s.processStateRequests()
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) receiveAndQueueGossipMessages(ch <-chan *proto.GossipMessage) {
|
|
for msg := range ch {
|
|
s.logger.Debug("Received new message via gossip channel")
|
|
go func(msg *proto.GossipMessage) {
|
|
if !bytes.Equal(msg.Channel, []byte(s.chainID)) {
|
|
s.logger.Warning("Received enqueue for channel",
|
|
string(msg.Channel), "while expecting channel", s.chainID, "ignoring enqueue")
|
|
return
|
|
}
|
|
|
|
dataMsg := msg.GetDataMsg()
|
|
if dataMsg != nil {
|
|
if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil {
|
|
s.logger.Warningf("Block [%d] received from gossip wasn't added to payload buffer: %v", dataMsg.Payload.SeqNum, err)
|
|
return
|
|
}
|
|
} else {
|
|
s.logger.Debug("Gossip message received is not of data message type, usually this should not happen.")
|
|
}
|
|
}(msg)
|
|
}
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) receiveAndDispatchDirectMessages(ch <-chan protoext.ReceivedMessage) {
|
|
for msg := range ch {
|
|
s.logger.Debug("Dispatching a message", msg)
|
|
go func(msg protoext.ReceivedMessage) {
|
|
gm := msg.GetGossipMessage()
|
|
// Check type of the message
|
|
if protoext.IsRemoteStateMessage(gm.GossipMessage) {
|
|
s.logger.Debug("Handling direct state transfer message")
|
|
// Got state transfer request response
|
|
s.directMessage(msg)
|
|
} else if gm.GetPrivateData() != nil {
|
|
s.logger.Debug("Handling private data collection message")
|
|
// Handling private data replication message
|
|
s.privateDataMessage(msg)
|
|
}
|
|
}(msg)
|
|
}
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) privateDataMessage(msg protoext.ReceivedMessage) {
|
|
if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) {
|
|
s.logger.Warning("Received state transfer request for channel",
|
|
string(msg.GetGossipMessage().Channel), "while expecting channel", s.chainID, "skipping request...")
|
|
return
|
|
}
|
|
|
|
gossipMsg := msg.GetGossipMessage()
|
|
pvtDataMsg := gossipMsg.GetPrivateData()
|
|
|
|
if pvtDataMsg.Payload == nil {
|
|
s.logger.Warning("Malformed private data message, no payload provided")
|
|
return
|
|
}
|
|
|
|
collectionName := pvtDataMsg.Payload.CollectionName
|
|
txID := pvtDataMsg.Payload.TxId
|
|
pvtRwSet := pvtDataMsg.Payload.PrivateRwset
|
|
|
|
if len(pvtRwSet) == 0 {
|
|
s.logger.Warning("Malformed private data message, no rwset provided, collection name = ", collectionName)
|
|
return
|
|
}
|
|
|
|
txPvtRwSet := &rwset.TxPvtReadWriteSet{
|
|
DataModel: rwset.TxReadWriteSet_KV,
|
|
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
|
|
{
|
|
Namespace: pvtDataMsg.Payload.Namespace,
|
|
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
|
|
CollectionName: collectionName,
|
|
Rwset: pvtRwSet,
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
|
|
txPvtRwSetWithConfig := &transientstore.TxPvtReadWriteSetWithConfigInfo{
|
|
PvtRwset: txPvtRwSet,
|
|
CollectionConfigs: map[string]*peer.CollectionConfigPackage{
|
|
pvtDataMsg.Payload.Namespace: pvtDataMsg.Payload.CollectionConfigs,
|
|
},
|
|
}
|
|
|
|
if err := s.ledger.StorePvtData(txID, txPvtRwSetWithConfig, pvtDataMsg.Payload.PrivateSimHeight); err != nil {
|
|
s.logger.Errorf("Wasn't able to persist private data for collection %s, due to %s", collectionName, err)
|
|
msg.Ack(err) // Sending NACK to indicate failure of storing collection
|
|
}
|
|
|
|
msg.Ack(nil)
|
|
s.logger.Debug("Private data for collection", collectionName, "has been stored")
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) directMessage(msg protoext.ReceivedMessage) {
|
|
s.logger.Debug("[ENTER] -> directMessage")
|
|
defer s.logger.Debug("[EXIT] -> directMessage")
|
|
|
|
if msg == nil {
|
|
s.logger.Error("Got nil message via end-to-end channel, should not happen!")
|
|
return
|
|
}
|
|
|
|
if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) {
|
|
s.logger.Warning("Received state transfer request for channel",
|
|
string(msg.GetGossipMessage().Channel), "while expecting channel", s.chainID, "skipping request...")
|
|
return
|
|
}
|
|
|
|
incoming := msg.GetGossipMessage()
|
|
|
|
if incoming.GetStateRequest() != nil {
|
|
if len(s.stateRequestCh) < s.config.StateChannelSize {
|
|
// Forward state request to the channel, if there are too
|
|
// many message of state request ignore to avoid flooding.
|
|
s.stateRequestCh <- msg
|
|
}
|
|
} else if incoming.GetStateResponse() != nil {
|
|
// If no state transfer procedure activate there is
|
|
// no reason to process the message
|
|
if atomic.LoadInt32(&s.stateTransferActive) == 1 {
|
|
// Send signal of state response message
|
|
s.stateResponseCh <- msg
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) processStateRequests() {
|
|
for {
|
|
msg, stillOpen := <-s.stateRequestCh
|
|
if !stillOpen {
|
|
return
|
|
}
|
|
s.handleStateRequest(msg)
|
|
}
|
|
}
|
|
|
|
// handleStateRequest handles state request message, validate batch size, reads current leader state to
|
|
// obtain required blocks, builds response message and send it back
|
|
func (s *GossipStateProviderImpl) handleStateRequest(msg protoext.ReceivedMessage) {
|
|
if msg == nil {
|
|
return
|
|
}
|
|
request := msg.GetGossipMessage().GetStateRequest()
|
|
|
|
if err := s.requestValidator.validate(request, s.config.StateBatchSize); err != nil {
|
|
s.logger.Errorf("State request validation failed, %s. Ignoring request...", err)
|
|
return
|
|
}
|
|
|
|
currentHeight, err := s.ledger.LedgerHeight()
|
|
if err != nil {
|
|
s.logger.Errorf("Cannot access to current ledger height, due to %+v", err)
|
|
return
|
|
}
|
|
if currentHeight < request.EndSeqNum {
|
|
s.logger.Warningf("Received state request to transfer blocks with sequence numbers higher [%d...%d] "+
|
|
"than available in ledger (%d)", request.StartSeqNum, request.StartSeqNum, currentHeight)
|
|
}
|
|
|
|
endSeqNum := min(currentHeight, request.EndSeqNum)
|
|
|
|
response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)}
|
|
for seqNum := request.StartSeqNum; seqNum <= endSeqNum; seqNum++ {
|
|
s.logger.Debug("Reading block ", seqNum, " with private data from the coordinator service")
|
|
connInfo := msg.GetConnectionInfo()
|
|
peerAuthInfo := protoutil.SignedData{
|
|
Data: connInfo.Auth.SignedData,
|
|
Signature: connInfo.Auth.Signature,
|
|
Identity: connInfo.Identity,
|
|
}
|
|
block, pvtData, err := s.ledger.GetPvtDataAndBlockByNum(seqNum, peerAuthInfo)
|
|
if err != nil {
|
|
s.logger.Errorf("cannot read block number %d from ledger, because %+v, skipping...", seqNum, err)
|
|
continue
|
|
}
|
|
|
|
if block == nil {
|
|
s.logger.Errorf("Wasn't able to read block with sequence number %d from ledger, skipping....", seqNum)
|
|
continue
|
|
}
|
|
|
|
blockBytes, err := pb.Marshal(block)
|
|
if err != nil {
|
|
s.logger.Errorf("Could not marshal block: %+v", errors.WithStack(err))
|
|
continue
|
|
}
|
|
|
|
var pvtBytes [][]byte
|
|
if pvtData != nil {
|
|
// Marshal private data
|
|
pvtBytes, err = pvtData.Marshal()
|
|
if err != nil {
|
|
s.logger.Errorf("Failed to marshal private rwset for block %d due to %+v", seqNum, errors.WithStack(err))
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Appending result to the response
|
|
response.Payloads = append(response.Payloads, &proto.Payload{
|
|
SeqNum: seqNum,
|
|
Data: blockBytes,
|
|
PrivateData: pvtBytes,
|
|
})
|
|
}
|
|
// Sending back response with missing blocks
|
|
msg.Respond(&proto.GossipMessage{
|
|
// Copy nonce field from the request, so it will be possible to match response
|
|
Nonce: msg.GetGossipMessage().Nonce,
|
|
Tag: proto.GossipMessage_CHAN_OR_ORG,
|
|
Channel: []byte(s.chainID),
|
|
Content: &proto.GossipMessage_StateResponse{StateResponse: response},
|
|
})
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) handleStateResponse(msg protoext.ReceivedMessage) (uint64, error) {
|
|
max := uint64(0)
|
|
// Send signal that response for given nonce has been received
|
|
response := msg.GetGossipMessage().GetStateResponse()
|
|
// Extract payloads, verify and push into buffer
|
|
if len(response.GetPayloads()) == 0 {
|
|
return uint64(0), errors.New("Received state transfer response without payload")
|
|
}
|
|
for _, payload := range response.GetPayloads() {
|
|
s.logger.Debugf("Received payload with sequence number %d.", payload.SeqNum)
|
|
block, err := protoutil.UnmarshalBlock(payload.Data)
|
|
if err != nil {
|
|
s.logger.Warningf("Error unmarshalling payload to block for sequence number %d, due to %+v", payload.SeqNum, err)
|
|
return uint64(0), err
|
|
}
|
|
|
|
if err := s.mediator.VerifyBlock(common2.ChannelID(s.chainID), payload.SeqNum, block); err != nil {
|
|
err = errors.WithStack(err)
|
|
s.logger.Warningf("Error verifying block with sequence number %d, due to %+v", payload.SeqNum, err)
|
|
return uint64(0), err
|
|
}
|
|
if max < payload.SeqNum {
|
|
max = payload.SeqNum
|
|
}
|
|
|
|
err = s.addPayload(payload, blocking)
|
|
if err != nil {
|
|
s.logger.Warningf("Block [%d] received from block transfer wasn't added to payload buffer: %v", payload.SeqNum, err)
|
|
}
|
|
}
|
|
return max, nil
|
|
}
|
|
|
|
// Stop function sends halting signal to all go routines
|
|
func (s *GossipStateProviderImpl) Stop() {
|
|
// Make sure stop won't be executed twice
|
|
// and stop channel won't be used again
|
|
s.once.Do(func() {
|
|
close(s.stopCh)
|
|
// Close all resources
|
|
s.ledger.Close()
|
|
close(s.stateRequestCh)
|
|
close(s.stateResponseCh)
|
|
})
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) deliverPayloads() {
|
|
for {
|
|
select {
|
|
// Wait for notification that next seq has arrived
|
|
case <-s.payloads.Ready():
|
|
s.logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
|
|
// Collect all subsequent payloads
|
|
for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
|
|
rawBlock := &common.Block{}
|
|
if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
|
|
s.logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
|
|
continue
|
|
}
|
|
if rawBlock.Data == nil || rawBlock.Header == nil {
|
|
s.logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
|
|
payload.SeqNum, rawBlock.Header, rawBlock.Data)
|
|
continue
|
|
}
|
|
s.logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))
|
|
|
|
// Read all private data into slice
|
|
var p util.PvtDataCollections
|
|
if payload.PrivateData != nil {
|
|
err := p.Unmarshal(payload.PrivateData)
|
|
if err != nil {
|
|
s.logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
|
|
continue
|
|
}
|
|
}
|
|
if err := s.commitBlock(rawBlock, p); err != nil {
|
|
if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
|
|
s.logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
|
|
return
|
|
}
|
|
s.logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
|
|
}
|
|
}
|
|
case <-s.stopCh:
|
|
s.logger.Debug("State provider has been stopped, finishing to push new blocks.")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) antiEntropy() {
|
|
defer s.logger.Debug("State Provider stopped, stopping anti entropy procedure.")
|
|
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-time.After(s.config.StateCheckInterval):
|
|
ourHeight, err := s.ledger.LedgerHeight()
|
|
if err != nil {
|
|
// Unable to read from ledger continue to the next round
|
|
s.logger.Errorf("Cannot obtain ledger height, due to %+v", errors.WithStack(err))
|
|
continue
|
|
}
|
|
if ourHeight == 0 {
|
|
s.logger.Error("Ledger reported block height of 0 but this should be impossible")
|
|
continue
|
|
}
|
|
maxHeight := s.maxAvailableLedgerHeight()
|
|
if ourHeight >= maxHeight {
|
|
continue
|
|
}
|
|
|
|
s.requestBlocksInRange(uint64(ourHeight), uint64(maxHeight)-1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// maxAvailableLedgerHeight iterates over all available peers and checks advertised meta state to
|
|
// find maximum available ledger height across peers
|
|
func (s *GossipStateProviderImpl) maxAvailableLedgerHeight() uint64 {
|
|
max := uint64(0)
|
|
for _, p := range s.mediator.PeersOfChannel(common2.ChannelID(s.chainID)) {
|
|
if p.Properties == nil {
|
|
s.logger.Debug("Peer", p.PreferredEndpoint(), "doesn't have properties, skipping it")
|
|
continue
|
|
}
|
|
peerHeight := p.Properties.LedgerHeight
|
|
if max < peerHeight {
|
|
max = peerHeight
|
|
}
|
|
}
|
|
return max
|
|
}
|
|
|
|
// requestBlocksInRange capable to acquire blocks with sequence
|
|
// numbers in the range [start...end).
|
|
func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) {
|
|
atomic.StoreInt32(&s.stateTransferActive, 1)
|
|
defer atomic.StoreInt32(&s.stateTransferActive, 0)
|
|
|
|
for prev := start; prev <= end; {
|
|
next := min(end, prev+s.config.StateBatchSize)
|
|
|
|
gossipMsg := s.stateRequestMessage(prev, next)
|
|
|
|
responseReceived := false
|
|
tryCounts := 0
|
|
|
|
for !responseReceived {
|
|
if tryCounts > s.config.StateMaxRetries {
|
|
s.logger.Warningf("Wasn't able to get blocks in range [%d...%d), after %d retries",
|
|
prev, next, tryCounts)
|
|
return
|
|
}
|
|
// Select peers to ask for blocks
|
|
peer, err := s.selectPeerToRequestFrom(next)
|
|
if err != nil {
|
|
s.logger.Warningf("Cannot send state request for blocks in range [%d...%d), due to %+v",
|
|
prev, next, errors.WithStack(err))
|
|
return
|
|
}
|
|
|
|
s.logger.Debugf("State transfer, with peer %s, requesting blocks in range [%d...%d), "+
|
|
"for chainID %s", peer.Endpoint, prev, next, s.chainID)
|
|
|
|
s.mediator.Send(gossipMsg, peer)
|
|
tryCounts++
|
|
|
|
// Wait until timeout or response arrival
|
|
select {
|
|
case msg, stillOpen := <-s.stateResponseCh:
|
|
if !stillOpen {
|
|
return
|
|
}
|
|
if msg.GetGossipMessage().Nonce !=
|
|
gossipMsg.Nonce {
|
|
continue
|
|
}
|
|
// Got corresponding response for state request, can continue
|
|
index, err := s.handleStateResponse(msg)
|
|
if err != nil {
|
|
s.logger.Warningf("Wasn't able to process state response for "+
|
|
"blocks [%d...%d], due to %+v", prev, next, errors.WithStack(err))
|
|
continue
|
|
}
|
|
prev = index + 1
|
|
responseReceived = true
|
|
case <-time.After(s.config.StateResponseTimeout):
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// stateRequestMessage generates state request message for given blocks in range [beginSeq...endSeq]
|
|
func (s *GossipStateProviderImpl) stateRequestMessage(beginSeq uint64, endSeq uint64) *proto.GossipMessage {
|
|
return &proto.GossipMessage{
|
|
Nonce: util.RandomUInt64(),
|
|
Tag: proto.GossipMessage_CHAN_OR_ORG,
|
|
Channel: []byte(s.chainID),
|
|
Content: &proto.GossipMessage_StateRequest{
|
|
StateRequest: &proto.RemoteStateRequest{
|
|
StartSeqNum: beginSeq,
|
|
EndSeqNum: endSeq,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// selectPeerToRequestFrom selects peer which has required blocks to ask missing blocks from
|
|
func (s *GossipStateProviderImpl) selectPeerToRequestFrom(height uint64) (*comm.RemotePeer, error) {
|
|
// Filter peers which posses required range of missing blocks
|
|
peers := s.filterPeers(s.hasRequiredHeight(height))
|
|
|
|
n := len(peers)
|
|
if n == 0 {
|
|
return nil, errors.New("there are no peers to ask for missing blocks from")
|
|
}
|
|
|
|
// Select peer to ask for blocks
|
|
return peers[util.RandomInt(n)], nil
|
|
}
|
|
|
|
// filterPeers returns list of peers which aligns the predicate provided
|
|
func (s *GossipStateProviderImpl) filterPeers(predicate func(peer discovery.NetworkMember) bool) []*comm.RemotePeer {
|
|
var peers []*comm.RemotePeer
|
|
|
|
for _, member := range s.mediator.PeersOfChannel(common2.ChannelID(s.chainID)) {
|
|
if predicate(member) {
|
|
peers = append(peers, &comm.RemotePeer{Endpoint: member.PreferredEndpoint(), PKIID: member.PKIid})
|
|
}
|
|
}
|
|
|
|
return peers
|
|
}
|
|
|
|
// hasRequiredHeight returns predicate which is capable to filter peers with ledger height above than indicated
|
|
// by provided input parameter
|
|
func (s *GossipStateProviderImpl) hasRequiredHeight(height uint64) func(peer discovery.NetworkMember) bool {
|
|
return func(peer discovery.NetworkMember) bool {
|
|
if peer.Properties != nil {
|
|
return peer.Properties.LedgerHeight >= height
|
|
}
|
|
s.logger.Debug(peer.PreferredEndpoint(), "doesn't have properties")
|
|
return false
|
|
}
|
|
}
|
|
|
|
// AddPayload adds new payload into state.
|
|
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
|
|
return s.addPayload(payload, s.blockingMode)
|
|
}
|
|
|
|
// addPayload adds new payload into state. It may (or may not) block according to the
|
|
// given parameter. If it gets a block while in blocking mode - it would wait until
|
|
// the block is sent into the payloads buffer.
|
|
// Else - it may drop the block, if the payload buffer is too full.
|
|
func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error {
|
|
if payload == nil {
|
|
return errors.New("Given payload is nil")
|
|
}
|
|
s.logger.Debugf("[%s] Adding payload to local buffer, blockNum = [%d]", s.chainID, payload.SeqNum)
|
|
height, err := s.ledger.LedgerHeight()
|
|
if err != nil {
|
|
return errors.Wrap(err, "Failed obtaining ledger height")
|
|
}
|
|
|
|
if !blockingMode && payload.SeqNum-height >= uint64(s.config.StateBlockBufferSize) {
|
|
if s.straggler(height, payload) {
|
|
s.logger.Warningf("[%s] Current block height (%d) is too far behind other peers at height (%d) to be able to receive blocks "+
|
|
"without state transfer which is disabled in the configuration "+
|
|
"(peer.gossip.state.enabled = false). Consider enabling it or setting the peer explicitly to be a leader (peer.gossip.orgLeader = true) "+
|
|
"in order to pull blocks directly from the ordering service.",
|
|
s.chainID, height, payload.SeqNum+1)
|
|
}
|
|
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
|
|
}
|
|
|
|
for blockingMode && s.payloads.Size() > s.config.StateBlockBufferSize*2 {
|
|
time.Sleep(enqueueRetryInterval)
|
|
}
|
|
|
|
s.payloads.Push(payload)
|
|
s.logger.Debugf("Blocks payloads buffer size for channel [%s] is %d blocks", s.chainID, s.payloads.Size())
|
|
return nil
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) straggler(currHeight uint64, receivedPayload *proto.Payload) bool {
|
|
// If state transfer is disabled, there is no way to request blocks from peers that their ledger has advanced too far.
|
|
stateDisabled := !s.config.StateEnabled
|
|
// We are too far behind if we received a block with a sequence number more than stragglerWarningThreshold ahead of our height.
|
|
tooFarBehind := currHeight+stragglerWarningThreshold < receivedPayload.SeqNum
|
|
// We depend on other peers for blocks if we use leader election, or we are not explicitly configured to be an org leader.
|
|
peerDependent := s.config.UseLeaderElection || !s.config.OrgLeader
|
|
return stateDisabled && tooFarBehind && peerDependent
|
|
}
|
|
|
|
func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {
|
|
t1 := time.Now()
|
|
|
|
// Commit block with available private transactions
|
|
if err := s.ledger.StoreBlock(block, pvtData); err != nil {
|
|
s.logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
|
|
return err
|
|
}
|
|
|
|
sinceT1 := time.Since(t1)
|
|
s.stateMetrics.CommitDuration.With("channel", s.chainID).Observe(sinceT1.Seconds())
|
|
|
|
// Update ledger height
|
|
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChannelID(s.chainID))
|
|
s.logger.Debugf("[%s] Committed block [%d] with %d transaction(s)",
|
|
s.chainID, block.Header.Number, len(block.Data.Data))
|
|
|
|
s.stateMetrics.Height.With("channel", s.chainID).Set(float64(block.Header.Number + 1))
|
|
|
|
return nil
|
|
}
|
|
|
|
func min(a uint64, b uint64) uint64 {
|
|
return b ^ ((a ^ b) & (-(uint64(a-b) >> 63)))
|
|
}
|