648 lines
21 KiB
Go
648 lines
21 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package smartbft
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"fmt"
|
|
"reflect"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/SmartBFT-Go/consensus/pkg/api"
|
|
smartbft "github.com/SmartBFT-Go/consensus/pkg/consensus"
|
|
"github.com/SmartBFT-Go/consensus/pkg/metrics/disabled"
|
|
"github.com/SmartBFT-Go/consensus/pkg/types"
|
|
"github.com/SmartBFT-Go/consensus/pkg/wal"
|
|
"github.com/SmartBFT-Go/consensus/smartbftprotos"
|
|
"github.com/golang/protobuf/proto"
|
|
cb "github.com/hyperledger/fabric-protos-go/common"
|
|
"github.com/hyperledger/fabric-protos-go/msp"
|
|
"github.com/hyperledger/fabric/bccsp"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/hyperledger/fabric/common/policies"
|
|
"github.com/hyperledger/fabric/orderer/common/cluster"
|
|
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
|
|
types2 "github.com/hyperledger/fabric/orderer/common/types"
|
|
"github.com/hyperledger/fabric/orderer/consensus"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller
|
|
|
|
// BlockPuller is used to pull blocks from other OSN
|
|
type BlockPuller interface {
|
|
PullBlock(seq uint64) *cb.Block
|
|
HeightsByEndpoints() (map[string]uint64, error)
|
|
Close()
|
|
}
|
|
|
|
// WALConfig consensus specific configuration parameters from orderer.yaml; for SmartBFT only WALDir is relevant.
|
|
type WALConfig struct {
|
|
WALDir string // WAL data of <my-channel> is stored in WALDir/<my-channel>
|
|
SnapDir string // Snapshots of <my-channel> are stored in SnapDir/<my-channel>
|
|
EvictionSuspicion string // Duration threshold that the node samples in order to suspect its eviction from the channel.
|
|
}
|
|
|
|
// ConfigValidator interface
|
|
type ConfigValidator interface {
|
|
ValidateConfig(env *cb.Envelope) error
|
|
}
|
|
|
|
type signerSerializer interface {
|
|
// Sign a message and return the signature over the digest, or error on failure
|
|
Sign(message []byte) ([]byte, error)
|
|
|
|
// Serialize converts an identity to bytes
|
|
Serialize() ([]byte, error)
|
|
}
|
|
|
|
// BFTChain implements Chain interface to wire with
|
|
// BFT smart library
|
|
type BFTChain struct {
|
|
RuntimeConfig *atomic.Value
|
|
Channel string
|
|
Config types.Configuration
|
|
BlockPuller BlockPuller
|
|
Comm cluster.Communicator
|
|
SignerSerializer signerSerializer
|
|
PolicyManager policies.Manager
|
|
Logger *flogging.FabricLogger
|
|
WALDir string
|
|
consensus *smartbft.Consensus
|
|
support consensus.ConsenterSupport
|
|
clusterService *cluster.ClusterService
|
|
verifier *Verifier
|
|
assembler *Assembler
|
|
Metrics *Metrics
|
|
bccsp bccsp.BCCSP
|
|
|
|
statusReportMutex sync.Mutex
|
|
consensusRelation types2.ConsensusRelation
|
|
status types2.Status
|
|
}
|
|
|
|
// NewChain creates new BFT Smart chain
|
|
func NewChain(
|
|
cv ConfigValidator,
|
|
selfID uint64,
|
|
config types.Configuration,
|
|
walDir string,
|
|
blockPuller BlockPuller,
|
|
comm cluster.Communicator,
|
|
signerSerializer signerSerializer,
|
|
policyManager policies.Manager,
|
|
support consensus.ConsenterSupport,
|
|
metrics *Metrics,
|
|
bccsp bccsp.BCCSP,
|
|
|
|
) (*BFTChain, error) {
|
|
requestInspector := &RequestInspector{
|
|
ValidateIdentityStructure: func(_ *msp.SerializedIdentity) error {
|
|
return nil
|
|
},
|
|
}
|
|
|
|
logger := flogging.MustGetLogger("orderer.consensus.smartbft.chain").With(zap.String("channel", support.ChannelID()))
|
|
|
|
c := &BFTChain{
|
|
RuntimeConfig: &atomic.Value{},
|
|
Channel: support.ChannelID(),
|
|
Config: config,
|
|
WALDir: walDir,
|
|
Comm: comm,
|
|
support: support,
|
|
SignerSerializer: signerSerializer,
|
|
PolicyManager: policyManager,
|
|
BlockPuller: blockPuller,
|
|
Logger: logger,
|
|
consensusRelation: types2.ConsensusRelationConsenter,
|
|
status: types2.StatusActive,
|
|
Metrics: &Metrics{
|
|
ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()),
|
|
CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()),
|
|
IsLeader: metrics.IsLeader.With("channel", support.ChannelID()),
|
|
LeaderID: metrics.LeaderID.With("channel", support.ChannelID()),
|
|
},
|
|
bccsp: bccsp,
|
|
}
|
|
|
|
lastBlock := LastBlockFromLedgerOrPanic(support, c.Logger)
|
|
lastConfigBlock := LastConfigBlockFromLedgerOrPanic(support, c.Logger)
|
|
|
|
rtc := RuntimeConfig{
|
|
logger: logger,
|
|
id: selfID,
|
|
}
|
|
rtc, err := rtc.BlockCommitted(lastConfigBlock, bccsp)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed constructing RuntimeConfig")
|
|
}
|
|
rtc, err = rtc.BlockCommitted(lastBlock, bccsp)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed constructing RuntimeConfig")
|
|
}
|
|
|
|
c.RuntimeConfig.Store(rtc)
|
|
|
|
c.verifier = buildVerifier(cv, c.RuntimeConfig, support, requestInspector, policyManager)
|
|
c.consensus = bftSmartConsensusBuild(c, requestInspector)
|
|
|
|
// Setup communication with list of remotes notes for the new channel
|
|
c.Comm.Configure(c.support.ChannelID(), rtc.RemoteNodes)
|
|
|
|
if err := c.consensus.ValidateConfiguration(rtc.Nodes); err != nil {
|
|
return nil, errors.Wrap(err, "failed to verify SmartBFT-Go configuration")
|
|
}
|
|
|
|
logger.Infof("SmartBFT-v3 is now servicing chain %s", support.ChannelID())
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func bftSmartConsensusBuild(
|
|
c *BFTChain,
|
|
requestInspector *RequestInspector,
|
|
) *smartbft.Consensus {
|
|
var err error
|
|
|
|
rtc := c.RuntimeConfig.Load().(RuntimeConfig)
|
|
|
|
latestMetadata, err := getViewMetadataFromBlock(rtc.LastBlock)
|
|
if err != nil {
|
|
c.Logger.Panicf("Failed extracting view metadata from ledger: %v", err)
|
|
}
|
|
|
|
var consensusWAL *wal.WriteAheadLogFile
|
|
var walInitState [][]byte
|
|
|
|
c.Logger.Infof("Initializing a WAL for chain %s, on dir: %s", c.support.ChannelID(), c.WALDir)
|
|
consensusWAL, walInitState, err = wal.InitializeAndReadAll(c.Logger, c.WALDir, wal.DefaultOptions())
|
|
if err != nil {
|
|
c.Logger.Panicf("failed to initialize a WAL for chain %s, err %s", c.support.ChannelID(), err)
|
|
}
|
|
|
|
clusterSize := uint64(len(rtc.Nodes))
|
|
|
|
// report cluster size
|
|
c.Metrics.ClusterSize.Set(float64(clusterSize))
|
|
|
|
sync := &Synchronizer{
|
|
selfID: rtc.id,
|
|
BlockToDecision: c.blockToDecision,
|
|
OnCommit: func(block *cb.Block) types.Reconfig {
|
|
c.pruneCommittedRequests(block)
|
|
return c.updateRuntimeConfig(block)
|
|
},
|
|
Support: c.support,
|
|
BlockPuller: c.BlockPuller,
|
|
ClusterSize: clusterSize,
|
|
Logger: c.Logger,
|
|
LatestConfig: func() (types.Configuration, []uint64) {
|
|
rtc := c.RuntimeConfig.Load().(RuntimeConfig)
|
|
return rtc.BFTConfig, rtc.Nodes
|
|
},
|
|
}
|
|
|
|
channelDecorator := zap.String("channel", c.support.ChannelID())
|
|
logger := flogging.MustGetLogger("orderer.consensus.smartbft.consensus").With(channelDecorator)
|
|
|
|
c.assembler = &Assembler{
|
|
RuntimeConfig: c.RuntimeConfig,
|
|
VerificationSeq: c.verifier.VerificationSequence,
|
|
Logger: flogging.MustGetLogger("orderer.consensus.smartbft.assembler").With(channelDecorator),
|
|
}
|
|
|
|
consensus := &smartbft.Consensus{
|
|
Config: c.Config,
|
|
Logger: logger,
|
|
Verifier: c.verifier,
|
|
Signer: &Signer{
|
|
ID: c.Config.SelfID,
|
|
Logger: flogging.MustGetLogger("orderer.consensus.smartbft.signer").With(channelDecorator),
|
|
SignerSerializer: c.SignerSerializer,
|
|
LastConfigBlockNum: func(block *cb.Block) uint64 {
|
|
if protoutil.IsConfigBlock(block) {
|
|
return block.Header.Number
|
|
}
|
|
|
|
return c.RuntimeConfig.Load().(RuntimeConfig).LastConfigBlock.Header.Number
|
|
},
|
|
},
|
|
MetricsProvider: api.NewCustomerProvider(&disabled.Provider{}),
|
|
Metadata: &smartbftprotos.ViewMetadata{
|
|
ViewId: latestMetadata.ViewId,
|
|
LatestSequence: latestMetadata.LatestSequence,
|
|
DecisionsInView: latestMetadata.DecisionsInView,
|
|
BlackList: latestMetadata.BlackList,
|
|
PrevCommitSignatureDigest: latestMetadata.PrevCommitSignatureDigest,
|
|
},
|
|
WAL: consensusWAL,
|
|
WALInitialContent: walInitState, // Read from WAL entries
|
|
Application: c,
|
|
Assembler: c.assembler,
|
|
RequestInspector: requestInspector,
|
|
Synchronizer: sync,
|
|
Comm: &Egress{
|
|
RuntimeConfig: c.RuntimeConfig,
|
|
Channel: c.support.ChannelID(),
|
|
Logger: flogging.MustGetLogger("orderer.consensus.smartbft.egress").With(channelDecorator),
|
|
RPC: &cluster.RPC{
|
|
Logger: flogging.MustGetLogger("orderer.consensus.smartbft.rpc").With(channelDecorator),
|
|
Channel: c.support.ChannelID(),
|
|
StreamsByType: cluster.NewStreamsByType(),
|
|
Comm: c.Comm,
|
|
Timeout: 5 * time.Minute, // Externalize configuration
|
|
},
|
|
},
|
|
Scheduler: time.NewTicker(time.Second).C,
|
|
ViewChangerTicker: time.NewTicker(time.Second).C,
|
|
}
|
|
|
|
proposal, signatures := c.lastPersistedProposalAndSignatures()
|
|
if proposal != nil {
|
|
consensus.LastProposal = *proposal
|
|
consensus.LastSignatures = signatures
|
|
}
|
|
|
|
return consensus
|
|
}
|
|
|
|
func (c *BFTChain) pruneCommittedRequests(block *cb.Block) {
|
|
workerNum := runtime.NumCPU()
|
|
|
|
var workers []*worker
|
|
|
|
for i := 0; i < workerNum; i++ {
|
|
workers = append(workers, &worker{
|
|
id: i,
|
|
work: block.Data.Data,
|
|
workerNum: workerNum,
|
|
f: func(tx []byte) {
|
|
ri := c.verifier.ReqInspector.RequestID(tx)
|
|
c.consensus.Pool.RemoveRequest(ri)
|
|
},
|
|
})
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(workers))
|
|
|
|
for i := 0; i < len(workers); i++ {
|
|
go func(w *worker) {
|
|
defer wg.Done()
|
|
w.doWork()
|
|
}(workers[i])
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (c *BFTChain) submit(env *cb.Envelope, configSeq uint64) error {
|
|
reqBytes, err := proto.Marshal(env)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to marshal request envelope")
|
|
}
|
|
|
|
c.Logger.Debugf("Consensus.SubmitRequest, node id %d", c.Config.SelfID)
|
|
if err := c.consensus.SubmitRequest(reqBytes); err != nil {
|
|
return errors.Wrapf(err, "failed to submit request")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Order accepts a message which has been processed at a given configSeq.
|
|
// If the configSeq advances, it is the responsibility of the consenter
|
|
// to revalidate and potentially discard the message
|
|
// The consenter may return an error, indicating the message was not accepted
|
|
func (c *BFTChain) Order(env *cb.Envelope, configSeq uint64) error {
|
|
seq := c.support.Sequence()
|
|
if configSeq < seq {
|
|
c.Logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", configSeq, seq)
|
|
if _, err := c.support.ProcessNormalMsg(env); err != nil {
|
|
return errors.Errorf("bad normal message: %s", err)
|
|
}
|
|
}
|
|
|
|
return c.submit(env, configSeq)
|
|
}
|
|
|
|
// Configure accepts a message which reconfigures the channel and will
|
|
// trigger an update to the configSeq if committed. The configuration must have
|
|
// been triggered by a ConfigUpdate message. If the config sequence advances,
|
|
// it is the responsibility of the consenter to recompute the resulting config,
|
|
// discarding the message if the reconfiguration is no longer valid.
|
|
// The consenter may return an error, indicating the message was not accepted
|
|
func (c *BFTChain) Configure(config *cb.Envelope, configSeq uint64) error {
|
|
// TODO: check configuration update validity
|
|
seq := c.support.Sequence()
|
|
if configSeq < seq {
|
|
c.Logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", configSeq, seq)
|
|
if configEnv, _, err := c.support.ProcessConfigMsg(config); err != nil {
|
|
return errors.Errorf("bad normal message: %s", err)
|
|
} else {
|
|
return c.submit(configEnv, configSeq)
|
|
}
|
|
}
|
|
return c.submit(config, configSeq)
|
|
}
|
|
|
|
// Deliver delivers proposal, writes block with transactions and metadata
|
|
func (c *BFTChain) Deliver(proposal types.Proposal, signatures []types.Signature) types.Reconfig {
|
|
block, err := ProposalToBlock(proposal)
|
|
if err != nil {
|
|
c.Logger.Panicf("failed to read proposal, err: %s", err)
|
|
}
|
|
|
|
var sigs []*cb.MetadataSignature
|
|
var ordererBlockMetadata []byte
|
|
|
|
var signers []uint64
|
|
|
|
for _, s := range signatures {
|
|
sig := &Signature{}
|
|
if err := sig.Unmarshal(s.Msg); err != nil {
|
|
c.Logger.Errorf("Failed unmarshaling signature from %d: %v", s.ID, err)
|
|
c.Logger.Errorf("Offending signature Msg: %s", base64.StdEncoding.EncodeToString(s.Msg))
|
|
c.Logger.Errorf("Offending signature Value: %s", base64.StdEncoding.EncodeToString(s.Value))
|
|
c.Logger.Errorf("Halting chain.")
|
|
c.Halt()
|
|
return types.Reconfig{}
|
|
}
|
|
|
|
if ordererBlockMetadata == nil {
|
|
ordererBlockMetadata = sig.OrdererBlockMetadata
|
|
}
|
|
|
|
sigs = append(sigs, &cb.MetadataSignature{
|
|
// AuxiliaryInput: sig.AuxiliaryInput,
|
|
Signature: s.Value,
|
|
// We do not put a signature header when we commit the block.
|
|
// Instead, we put the nonce and the identifier and at validation
|
|
// we reconstruct the signature header at runtime.
|
|
// SignatureHeader: sig.SignatureHeader,
|
|
// Nonce: sig.Nonce,
|
|
// SignerId: s.ID,
|
|
IdentifierHeader: sig.IdentifierHeader,
|
|
})
|
|
|
|
signers = append(signers, s.ID)
|
|
}
|
|
|
|
block.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES] = protoutil.MarshalOrPanic(&cb.Metadata{
|
|
Value: ordererBlockMetadata,
|
|
Signatures: sigs,
|
|
})
|
|
|
|
var mdTotalSize int
|
|
for _, md := range block.Metadata.Metadata {
|
|
mdTotalSize += len(md)
|
|
}
|
|
|
|
c.Logger.Infof("Delivering proposal, writing block %d with %d transactions and metadata of total size %d with signatures from %v to the ledger, node id %d",
|
|
block.Header.Number,
|
|
len(block.Data.Data),
|
|
mdTotalSize,
|
|
signers,
|
|
c.Config.SelfID)
|
|
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number)) // report the committed block number
|
|
c.reportIsLeader() // report the leader
|
|
if protoutil.IsConfigBlock(block) {
|
|
c.support.WriteConfigBlock(block, nil)
|
|
} else {
|
|
c.support.WriteBlock(block, nil)
|
|
}
|
|
|
|
reconfig := c.updateRuntimeConfig(block)
|
|
return reconfig
|
|
}
|
|
|
|
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
|
|
// This is useful when consenter needs to temporarily block ingress messages so
|
|
// that in-flight messages can be consumed. It could return error if consenter is
|
|
// in erroneous states. If this blocking behavior is not desired, consenter could
|
|
// simply return nil.
|
|
func (c *BFTChain) WaitReady() error {
|
|
return nil
|
|
}
|
|
|
|
// Errored returns a channel which will close when an error has occurred.
|
|
// This is especially useful for the Deliver client, who must terminate waiting
|
|
// clients when the consenter is not up to date.
|
|
func (c *BFTChain) Errored() <-chan struct{} {
|
|
// TODO: Implement Errored
|
|
return nil
|
|
}
|
|
|
|
// Start should allocate whatever resources are needed for staying up to date with the chain.
|
|
// Typically, this involves creating a thread which reads from the ordering source, passes those
|
|
// messages to a block cutter, and writes the resulting blocks to the ledger.
|
|
func (c *BFTChain) Start() {
|
|
if err := c.consensus.Start(); err != nil {
|
|
c.Logger.Panicf("Failed to start chain, aborting: %+v", err)
|
|
}
|
|
c.reportIsLeader() // report the leader
|
|
}
|
|
|
|
// Halt frees the resources which were allocated for this Chain.
|
|
func (c *BFTChain) Halt() {
|
|
c.Logger.Infof("Shutting down chain")
|
|
c.consensus.Stop()
|
|
}
|
|
|
|
func (c *BFTChain) blockToProposalWithoutSignaturesInMetadata(block *cb.Block) types.Proposal {
|
|
blockClone := proto.Clone(block).(*cb.Block)
|
|
if len(blockClone.Metadata.Metadata) > int(cb.BlockMetadataIndex_SIGNATURES) {
|
|
signatureMetadata := &cb.Metadata{}
|
|
// Nil out signatures because we carry them around separately in the library format.
|
|
if err := proto.Unmarshal(blockClone.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES], signatureMetadata); err != nil {
|
|
// nothing to do
|
|
c.Logger.Errorf("Error unmarshalling signature metadata from block: %s", err)
|
|
}
|
|
signatureMetadata.Signatures = nil
|
|
blockClone.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES] = protoutil.MarshalOrPanic(signatureMetadata)
|
|
}
|
|
prop := types.Proposal{
|
|
Header: protoutil.BlockHeaderBytes(blockClone.Header),
|
|
Payload: (&ByteBufferTuple{
|
|
A: protoutil.MarshalOrPanic(blockClone.Data),
|
|
B: protoutil.MarshalOrPanic(blockClone.Metadata),
|
|
}).ToBytes(),
|
|
VerificationSequence: int64(c.verifier.VerificationSequence()),
|
|
}
|
|
|
|
if protoutil.IsConfigBlock(block) {
|
|
prop.VerificationSequence--
|
|
}
|
|
|
|
return prop
|
|
}
|
|
|
|
func (c *BFTChain) blockToDecision(block *cb.Block) *types.Decision {
|
|
proposal := c.blockToProposalWithoutSignaturesInMetadata(block)
|
|
if block.Header.Number == 0 {
|
|
return &types.Decision{
|
|
Proposal: proposal,
|
|
}
|
|
}
|
|
|
|
signatureMetadata := &cb.Metadata{}
|
|
if err := proto.Unmarshal(block.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES], signatureMetadata); err != nil {
|
|
c.Logger.Panicf("Failed unmarshaling signatures from block metadata: %v", err)
|
|
}
|
|
|
|
ordererMDFromBlock := &cb.OrdererBlockMetadata{}
|
|
if err := proto.Unmarshal(signatureMetadata.Value, ordererMDFromBlock); err != nil {
|
|
c.Logger.Panicf("Failed unmarshaling OrdererBlockMetadata from block signature metadata: %v", err)
|
|
}
|
|
|
|
proposal.Metadata = ordererMDFromBlock.ConsenterMetadata
|
|
|
|
var signatures []types.Signature
|
|
for _, sigMD := range signatureMetadata.Signatures {
|
|
idHdr := &cb.IdentifierHeader{}
|
|
if err := proto.Unmarshal(sigMD.IdentifierHeader, idHdr); err != nil {
|
|
c.Logger.Panicf("Failed unmarshaling identifier header for %s", base64.StdEncoding.EncodeToString(sigMD.IdentifierHeader))
|
|
}
|
|
sig := &Signature{
|
|
IdentifierHeader: sigMD.IdentifierHeader,
|
|
BlockHeader: protoutil.BlockHeaderBytes(block.Header),
|
|
OrdererBlockMetadata: signatureMetadata.Value,
|
|
}
|
|
signatures = append(signatures, types.Signature{
|
|
Msg: sig.Marshal(),
|
|
Value: sigMD.Signature,
|
|
ID: uint64(idHdr.Identifier),
|
|
})
|
|
}
|
|
|
|
return &types.Decision{
|
|
Signatures: signatures,
|
|
Proposal: proposal,
|
|
}
|
|
}
|
|
|
|
// HandleMessage handles the message from the sender
|
|
func (c *BFTChain) HandleMessage(sender uint64, m *smartbftprotos.Message) {
|
|
c.Logger.Debugf("Message from %d", sender)
|
|
c.consensus.HandleMessage(sender, m)
|
|
}
|
|
|
|
// HandleRequest handles the request from the sender
|
|
func (c *BFTChain) HandleRequest(sender uint64, req []byte) {
|
|
c.Logger.Debugf("HandleRequest from %d", sender)
|
|
if _, err := c.verifier.VerifyRequest(req); err != nil {
|
|
c.Logger.Warnf("Got bad request from %d: %v", sender, err)
|
|
return
|
|
}
|
|
c.consensus.SubmitRequest(req)
|
|
}
|
|
|
|
func (c *BFTChain) updateRuntimeConfig(block *cb.Block) types.Reconfig {
|
|
prevRTC := c.RuntimeConfig.Load().(RuntimeConfig)
|
|
newRTC, err := prevRTC.BlockCommitted(block, c.bccsp)
|
|
if err != nil {
|
|
c.Logger.Errorf("Failed constructing RuntimeConfig from block %d, halting chain", block.Header.Number)
|
|
c.Halt()
|
|
return types.Reconfig{}
|
|
}
|
|
c.RuntimeConfig.Store(newRTC)
|
|
if protoutil.IsConfigBlock(block) {
|
|
c.Comm.Configure(c.Channel, newRTC.RemoteNodes)
|
|
c.clusterService.ConfigureNodeCerts(c.Channel, newRTC.consenters)
|
|
}
|
|
|
|
membershipDidNotChange := reflect.DeepEqual(newRTC.Nodes, prevRTC.Nodes)
|
|
configDidNotChange := reflect.DeepEqual(newRTC.BFTConfig, prevRTC.BFTConfig)
|
|
noChangeDetected := membershipDidNotChange && configDidNotChange
|
|
return types.Reconfig{
|
|
InLatestDecision: !noChangeDetected,
|
|
CurrentNodes: newRTC.Nodes,
|
|
CurrentConfig: newRTC.BFTConfig,
|
|
}
|
|
}
|
|
|
|
func (c *BFTChain) lastPersistedProposalAndSignatures() (*types.Proposal, []types.Signature) {
|
|
lastBlock := LastBlockFromLedgerOrPanic(c.support, c.Logger)
|
|
// initial report of the last committed block number
|
|
c.Metrics.CommittedBlockNumber.Set(float64(lastBlock.Header.Number))
|
|
decision := c.blockToDecision(lastBlock)
|
|
return &decision.Proposal, decision.Signatures
|
|
}
|
|
|
|
func (c *BFTChain) reportIsLeader() {
|
|
leaderID := c.consensus.GetLeaderID()
|
|
c.Metrics.LeaderID.Set(float64(leaderID))
|
|
|
|
if leaderID == c.Config.SelfID {
|
|
c.Metrics.IsLeader.Set(1)
|
|
} else {
|
|
c.Metrics.IsLeader.Set(0)
|
|
}
|
|
}
|
|
|
|
// StatusReport returns the ConsensusRelation & Status
|
|
func (c *BFTChain) StatusReport() (types2.ConsensusRelation, types2.Status) {
|
|
c.statusReportMutex.Lock()
|
|
defer c.statusReportMutex.Unlock()
|
|
|
|
return c.consensusRelation, c.status
|
|
}
|
|
|
|
func buildVerifier(
|
|
cv ConfigValidator,
|
|
runtimeConfig *atomic.Value,
|
|
support consensus.ConsenterSupport,
|
|
requestInspector *RequestInspector,
|
|
policyManager policies.Manager,
|
|
) *Verifier {
|
|
channelDecorator := zap.String("channel", support.ChannelID())
|
|
logger := flogging.MustGetLogger("orderer.consensus.smartbft.verifier").With(channelDecorator)
|
|
return &Verifier{
|
|
ConfigValidator: cv,
|
|
VerificationSequencer: support,
|
|
ReqInspector: requestInspector,
|
|
Logger: logger,
|
|
RuntimeConfig: runtimeConfig,
|
|
ConsenterVerifier: &consenterVerifier{
|
|
logger: logger,
|
|
channel: support.ChannelID(),
|
|
policyManager: policyManager,
|
|
},
|
|
|
|
AccessController: &chainACL{
|
|
policyManager: policyManager,
|
|
Logger: logger,
|
|
},
|
|
Ledger: support,
|
|
}
|
|
}
|
|
|
|
type chainACL struct {
|
|
policyManager policies.Manager
|
|
Logger *flogging.FabricLogger
|
|
}
|
|
|
|
// Evaluate evaluates signed data
|
|
func (c *chainACL) Evaluate(signatureSet []*protoutil.SignedData) error {
|
|
policy, ok := c.policyManager.GetPolicy(policies.ChannelWriters)
|
|
if !ok {
|
|
return fmt.Errorf("could not find policy %s", policies.ChannelWriters)
|
|
}
|
|
|
|
err := policy.EvaluateSignedData(signatureSet)
|
|
if err != nil {
|
|
c.Logger.Debugf("SigFilter evaluation failed: %s, policyName: %s", err.Error(), policies.ChannelWriters)
|
|
return errors.Wrap(errors.WithStack(msgprocessor.ErrPermissionDenied), err.Error())
|
|
}
|
|
return nil
|
|
}
|