146 lines
4.7 KiB
Go
146 lines
4.7 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package blockcutter
|
|
|
|
import (
|
|
"time"
|
|
|
|
cb "github.com/hyperledger/fabric-protos-go/common"
|
|
"github.com/hyperledger/fabric/common/channelconfig"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
)
|
|
|
|
var logger = flogging.MustGetLogger("orderer.common.blockcutter")
|
|
|
|
type OrdererConfigFetcher interface {
|
|
OrdererConfig() (channelconfig.Orderer, bool)
|
|
}
|
|
|
|
// Receiver defines a sink for the ordered broadcast messages
|
|
type Receiver interface {
|
|
// Ordered should be invoked sequentially as messages are ordered
|
|
// Each batch in `messageBatches` will be wrapped into a block.
|
|
// `pending` indicates if there are still messages pending in the receiver.
|
|
Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool)
|
|
|
|
// Cut returns the current batch and starts a new one
|
|
Cut() []*cb.Envelope
|
|
}
|
|
|
|
type receiver struct {
|
|
sharedConfigFetcher OrdererConfigFetcher
|
|
pendingBatch []*cb.Envelope
|
|
pendingBatchSizeBytes uint32
|
|
|
|
PendingBatchStartTime time.Time
|
|
ChannelID string
|
|
Metrics *Metrics
|
|
}
|
|
|
|
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager
|
|
func NewReceiverImpl(channelID string, sharedConfigFetcher OrdererConfigFetcher, metrics *Metrics) Receiver {
|
|
return &receiver{
|
|
sharedConfigFetcher: sharedConfigFetcher,
|
|
Metrics: metrics,
|
|
ChannelID: channelID,
|
|
}
|
|
}
|
|
|
|
// Ordered should be invoked sequentially as messages are ordered
|
|
//
|
|
// messageBatches length: 0, pending: false
|
|
// - impossible, as we have just received a message
|
|
//
|
|
// messageBatches length: 0, pending: true
|
|
// - no batch is cut and there are messages pending
|
|
//
|
|
// messageBatches length: 1, pending: false
|
|
// - the message count reaches BatchSize.MaxMessageCount
|
|
//
|
|
// messageBatches length: 1, pending: true
|
|
// - the current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
|
|
//
|
|
// messageBatches length: 2, pending: false
|
|
// - the current message size in bytes exceeds BatchSize.PreferredMaxBytes, therefore isolated in its own batch.
|
|
//
|
|
// messageBatches length: 2, pending: true
|
|
// - impossible
|
|
//
|
|
// Note that messageBatches can not be greater than 2.
|
|
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {
|
|
if len(r.pendingBatch) == 0 {
|
|
// We are beginning a new batch, mark the time
|
|
r.PendingBatchStartTime = time.Now()
|
|
}
|
|
|
|
ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
|
|
if !ok {
|
|
logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
|
|
}
|
|
|
|
batchSize := ordererConfig.BatchSize()
|
|
|
|
messageSizeBytes := messageSizeBytes(msg)
|
|
if messageSizeBytes > batchSize.PreferredMaxBytes {
|
|
logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, batchSize.PreferredMaxBytes)
|
|
|
|
// cut pending batch, if it has any messages
|
|
if len(r.pendingBatch) > 0 {
|
|
messageBatch := r.Cut()
|
|
messageBatches = append(messageBatches, messageBatch)
|
|
}
|
|
|
|
// create new batch with single message
|
|
messageBatches = append(messageBatches, []*cb.Envelope{msg})
|
|
|
|
// Record that this batch took no time to fill
|
|
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)
|
|
|
|
return
|
|
}
|
|
|
|
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > batchSize.PreferredMaxBytes
|
|
|
|
if messageWillOverflowBatchSizeBytes {
|
|
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
|
|
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
|
|
messageBatch := r.Cut()
|
|
r.PendingBatchStartTime = time.Now()
|
|
messageBatches = append(messageBatches, messageBatch)
|
|
}
|
|
|
|
logger.Debugf("Enqueuing message into batch")
|
|
r.pendingBatch = append(r.pendingBatch, msg)
|
|
r.pendingBatchSizeBytes += messageSizeBytes
|
|
pending = true
|
|
|
|
if uint32(len(r.pendingBatch)) >= batchSize.MaxMessageCount {
|
|
logger.Debugf("Batch size met, cutting batch")
|
|
messageBatch := r.Cut()
|
|
messageBatches = append(messageBatches, messageBatch)
|
|
pending = false
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Cut returns the current batch and starts a new one
|
|
func (r *receiver) Cut() []*cb.Envelope {
|
|
if r.pendingBatch != nil {
|
|
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
|
|
}
|
|
r.PendingBatchStartTime = time.Time{}
|
|
batch := r.pendingBatch
|
|
r.pendingBatch = nil
|
|
r.pendingBatchSizeBytes = 0
|
|
return batch
|
|
}
|
|
|
|
func messageSizeBytes(message *cb.Envelope) uint32 {
|
|
return uint32(len(message.Payload) + len(message.Signature))
|
|
}
|