/* Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ package broadcast import ( "io" "time" cb "github.com/hyperledger/fabric-protos-go/common" ab "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/orderer/common/msgprocessor" "github.com/pkg/errors" ) var logger = flogging.MustGetLogger("orderer.common.broadcast") //go:generate counterfeiter -o mock/channel_support_registrar.go --fake-name ChannelSupportRegistrar . ChannelSupportRegistrar // ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel type ChannelSupportRegistrar interface { // BroadcastChannelSupport returns the message channel header, whether the message is a config update // and the channel resources for a message or an error if the message is not a message which can // be processed directly (like CONFIG and ORDERER_TRANSACTION messages) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error) } //go:generate counterfeiter -o mock/channel_support.go --fake-name ChannelSupport . ChannelSupport // ChannelSupport provides the backing resources needed to support broadcast on a channel type ChannelSupport interface { msgprocessor.Processor Consenter } // Consenter provides methods to send messages through consensus type Consenter interface { // Order accepts a message or returns an error indicating the cause of failure // It ultimately passes through to the consensus.Chain interface Order(env *cb.Envelope, configSeq uint64) error // Configure accepts a reconfiguration or returns an error indicating the cause of failure // It ultimately passes through to the consensus.Chain interface Configure(config *cb.Envelope, configSeq uint64) error // WaitReady blocks waiting for consenter to be ready for accepting new messages. // This is useful when consenter needs to temporarily block ingress messages so // that in-flight messages can be consumed. It could return error if consenter is // in erroneous states. If this blocking behavior is not desired, consenter could // simply return nil. WaitReady() error } // Handler is designed to handle connections from Broadcast AB gRPC service type Handler struct { SupportRegistrar ChannelSupportRegistrar Metrics *Metrics } // Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { addr := util.ExtractRemoteAddress(srv.Context()) logger.Debugf("Starting new broadcast loop for %s", addr) for { msg, err := srv.Recv() if err == io.EOF { logger.Debugf("Received EOF from %s, hangup", addr) return nil } if err != nil { logger.Warningf("Error reading from %s: %s", addr, err) return err } resp := bh.ProcessMessage(msg, addr) err = srv.Send(resp) if resp.Status != cb.Status_SUCCESS { return err } if err != nil { logger.Warningf("Error sending to %s: %s", addr, err) return err } } } type MetricsTracker struct { ValidateStartTime time.Time EnqueueStartTime time.Time ValidateDuration time.Duration ChannelID string TxType string Metrics *Metrics } func (mt *MetricsTracker) Record(resp *ab.BroadcastResponse) { labels := []string{ "status", resp.Status.String(), "channel", mt.ChannelID, "type", mt.TxType, } if mt.ValidateDuration == 0 { mt.EndValidate() } mt.Metrics.ValidateDuration.With(labels...).Observe(mt.ValidateDuration.Seconds()) if mt.EnqueueStartTime != (time.Time{}) { enqueueDuration := time.Since(mt.EnqueueStartTime) mt.Metrics.EnqueueDuration.With(labels...).Observe(enqueueDuration.Seconds()) } mt.Metrics.ProcessedCount.With(labels...).Add(1) } func (mt *MetricsTracker) BeginValidate() { mt.ValidateStartTime = time.Now() } func (mt *MetricsTracker) EndValidate() { mt.ValidateDuration = time.Since(mt.ValidateStartTime) } func (mt *MetricsTracker) BeginEnqueue() { mt.EnqueueStartTime = time.Now() } // ProcessMessage validates and enqueues a single message func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) { tracker := &MetricsTracker{ ChannelID: "unknown", TxType: "unknown", Metrics: bh.Metrics, } defer func() { // This looks a little unnecessary, but if done directly as // a defer, resp gets the (always nil) current state of resp // and not the return value tracker.Record(resp) }() tracker.BeginValidate() chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg) if chdr != nil { tracker.ChannelID = chdr.ChannelId tracker.TxType = cb.HeaderType(chdr.Type).String() } if err != nil { logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err) return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()} } if !isConfig { logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type]) configSeq, err := processor.ProcessNormalMsg(msg) if err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()} } tracker.EndValidate() tracker.BeginEnqueue() if err = processor.WaitReady(); err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } err = processor.Order(msg, configSeq) if err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } } else { // isConfig logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr) config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) if err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()} } tracker.EndValidate() tracker.BeginEnqueue() if err = processor.WaitReady(); err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } err = processor.Configure(config, configSeq) if err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } } logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr) return &ab.BroadcastResponse{Status: cb.Status_SUCCESS} } // ClassifyError converts an error type into a status code. func ClassifyError(err error) cb.Status { switch errors.Cause(err) { case msgprocessor.ErrChannelDoesNotExist: return cb.Status_NOT_FOUND case msgprocessor.ErrPermissionDenied: return cb.Status_FORBIDDEN case msgprocessor.ErrMaintenanceMode: return cb.Status_SERVICE_UNAVAILABLE default: return cb.Status_BAD_REQUEST } }