424 lines
14 KiB
Go
424 lines
14 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package deliver
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"math"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
cb "github.com/hyperledger/fabric-protos-go/common"
|
|
ab "github.com/hyperledger/fabric-protos-go/orderer"
|
|
"github.com/hyperledger/fabric/common/crypto"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/hyperledger/fabric/common/ledger/blockledger"
|
|
"github.com/hyperledger/fabric/common/policies"
|
|
"github.com/hyperledger/fabric/common/util"
|
|
"github.com/hyperledger/fabric/protoutil"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
var logger = flogging.MustGetLogger("common.deliver")
|
|
|
|
//go:generate counterfeiter -o mock/chain_manager.go -fake-name ChainManager . ChainManager
|
|
|
|
// ChainManager provides a way for the Handler to look up the Chain.
|
|
type ChainManager interface {
|
|
GetChain(chainID string) Chain
|
|
}
|
|
|
|
//go:generate counterfeiter -o mock/chain.go -fake-name Chain . Chain
|
|
|
|
// Chain encapsulates chain operations and data.
|
|
type Chain interface {
|
|
// Sequence returns the current config sequence number, can be used to detect config changes
|
|
Sequence() uint64
|
|
|
|
// PolicyManager returns the current policy manager as specified by the chain configuration
|
|
PolicyManager() policies.Manager
|
|
|
|
// Reader returns the chain Reader for the chain
|
|
Reader() blockledger.Reader
|
|
|
|
// Errored returns a channel which closes when the backing consenter has errored
|
|
Errored() <-chan struct{}
|
|
}
|
|
|
|
//go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker
|
|
|
|
// PolicyChecker checks the envelope against the policy logic supplied by the
|
|
// function.
|
|
type PolicyChecker interface {
|
|
CheckPolicy(envelope *cb.Envelope, channelID string) error
|
|
}
|
|
|
|
// The PolicyCheckerFunc is an adapter that allows the use of an ordinary
|
|
// function as a PolicyChecker.
|
|
type PolicyCheckerFunc func(envelope *cb.Envelope, channelID string) error
|
|
|
|
// CheckPolicy calls pcf(envelope, channelID)
|
|
func (pcf PolicyCheckerFunc) CheckPolicy(envelope *cb.Envelope, channelID string) error {
|
|
return pcf(envelope, channelID)
|
|
}
|
|
|
|
//go:generate counterfeiter -o mock/inspector.go -fake-name Inspector . Inspector
|
|
|
|
// Inspector verifies an appropriate binding between the message and the context.
|
|
type Inspector interface {
|
|
Inspect(context.Context, proto.Message) error
|
|
}
|
|
|
|
// The InspectorFunc is an adapter that allows the use of an ordinary
|
|
// function as an Inspector.
|
|
type InspectorFunc func(context.Context, proto.Message) error
|
|
|
|
// Inspect calls inspector(ctx, p)
|
|
func (inspector InspectorFunc) Inspect(ctx context.Context, p proto.Message) error {
|
|
return inspector(ctx, p)
|
|
}
|
|
|
|
// Handler handles server requests.
|
|
type Handler struct {
|
|
ExpirationCheckFunc func(identityBytes []byte) time.Time
|
|
ChainManager ChainManager
|
|
TimeWindow time.Duration
|
|
BindingInspector Inspector
|
|
Metrics *Metrics
|
|
}
|
|
|
|
//go:generate counterfeiter -o mock/receiver.go -fake-name Receiver . Receiver
|
|
|
|
// Receiver is used to receive enveloped seek requests.
|
|
type Receiver interface {
|
|
Recv() (*cb.Envelope, error)
|
|
}
|
|
|
|
//go:generate counterfeiter -o mock/response_sender.go -fake-name ResponseSender . ResponseSender
|
|
|
|
// ResponseSender defines the interface a handler must implement to send
|
|
// responses.
|
|
type ResponseSender interface {
|
|
// SendStatusResponse sends completion status to the client.
|
|
SendStatusResponse(status cb.Status) error
|
|
// SendBlockResponse sends the block and optionally private data to the client.
|
|
SendBlockResponse(data *cb.Block, channelID string, chain Chain, signedData *protoutil.SignedData) error
|
|
// DataType returns the data type sent by the sender
|
|
DataType() string
|
|
}
|
|
|
|
// Filtered is a marker interface that indicates a response sender
|
|
// is configured to send filtered blocks
|
|
// Note: this is replaced by "data_type" label. Keep it for now until we decide how to take care of compatibility issue.
|
|
type Filtered interface {
|
|
IsFiltered() bool
|
|
}
|
|
|
|
// Server is a polymorphic structure to support generalization of this handler
|
|
// to be able to deliver different type of responses.
|
|
type Server struct {
|
|
Receiver
|
|
PolicyChecker
|
|
ResponseSender
|
|
}
|
|
|
|
// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header.
|
|
func ExtractChannelHeaderCertHash(msg proto.Message) []byte {
|
|
chdr, isChannelHeader := msg.(*cb.ChannelHeader)
|
|
if !isChannelHeader || chdr == nil {
|
|
return nil
|
|
}
|
|
return chdr.TlsCertHash
|
|
}
|
|
|
|
// NewHandler creates an implementation of the Handler interface.
|
|
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics, expirationCheckDisabled bool) *Handler {
|
|
expirationCheck := crypto.ExpiresAt
|
|
if expirationCheckDisabled {
|
|
expirationCheck = noExpiration
|
|
}
|
|
return &Handler{
|
|
ChainManager: cm,
|
|
TimeWindow: timeWindow,
|
|
BindingInspector: InspectorFunc(NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
|
|
Metrics: metrics,
|
|
ExpirationCheckFunc: expirationCheck,
|
|
}
|
|
}
|
|
|
|
// Handle receives incoming deliver requests.
|
|
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
|
|
addr := util.ExtractRemoteAddress(ctx)
|
|
logger.Debugf("Starting new deliver loop for %s", addr)
|
|
h.Metrics.StreamsOpened.Add(1)
|
|
defer h.Metrics.StreamsClosed.Add(1)
|
|
for {
|
|
logger.Debugf("Attempting to read seek info message from %s", addr)
|
|
envelope, err := srv.Recv()
|
|
if err == io.EOF {
|
|
logger.Debugf("Received EOF from %s, hangup", addr)
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
logger.Warningf("Error reading from %s: %s", addr, err)
|
|
return err
|
|
}
|
|
|
|
status, err := h.deliverBlocks(ctx, srv, envelope)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = srv.SendStatusResponse(status)
|
|
if status != cb.Status_SUCCESS {
|
|
return err
|
|
}
|
|
if err != nil {
|
|
logger.Warningf("Error sending to %s: %s", addr, err)
|
|
return err
|
|
}
|
|
|
|
logger.Debugf("Waiting for new SeekInfo from %s", addr)
|
|
}
|
|
}
|
|
|
|
func isFiltered(srv *Server) bool {
|
|
if filtered, ok := srv.ResponseSender.(Filtered); ok {
|
|
return filtered.IsFiltered()
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
|
|
addr := util.ExtractRemoteAddress(ctx)
|
|
payload, chdr, shdr, err := h.parseEnvelope(ctx, envelope)
|
|
if err != nil {
|
|
logger.Warningf("error parsing envelope from %s: %s", addr, err)
|
|
return cb.Status_BAD_REQUEST, nil
|
|
}
|
|
|
|
chain := h.ChainManager.GetChain(chdr.ChannelId)
|
|
if chain == nil {
|
|
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
|
|
// So we would expect our log to be somewhat flooded with these
|
|
logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
|
|
return cb.Status_NOT_FOUND, nil
|
|
}
|
|
|
|
labels := []string{
|
|
"channel", chdr.ChannelId,
|
|
"filtered", strconv.FormatBool(isFiltered(srv)),
|
|
"data_type", srv.DataType(),
|
|
}
|
|
h.Metrics.RequestsReceived.With(labels...).Add(1)
|
|
defer func() {
|
|
labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
|
|
h.Metrics.RequestsCompleted.With(labels...).Add(1)
|
|
}()
|
|
|
|
seekInfo := &ab.SeekInfo{}
|
|
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
|
|
logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
|
|
return cb.Status_BAD_REQUEST, nil
|
|
}
|
|
|
|
erroredChan := chain.Errored()
|
|
if seekInfo.ErrorResponse == ab.SeekInfo_BEST_EFFORT {
|
|
// In a 'best effort' delivery of blocks, we should ignore consenter errors
|
|
// and continue to deliver blocks according to the client's request.
|
|
erroredChan = nil
|
|
}
|
|
select {
|
|
case <-erroredChan:
|
|
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
|
|
return cb.Status_SERVICE_UNAVAILABLE, nil
|
|
default:
|
|
}
|
|
|
|
accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, h.ExpirationCheckFunc)
|
|
if err != nil {
|
|
logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
|
|
return cb.Status_BAD_REQUEST, nil
|
|
}
|
|
|
|
if err := accessControl.Evaluate(); err != nil {
|
|
logger.Warningf("[channel: %s] Client %s is not authorized: %s", chdr.ChannelId, addr, err)
|
|
return cb.Status_FORBIDDEN, nil
|
|
}
|
|
|
|
if seekInfo.Start == nil || seekInfo.Stop == nil {
|
|
logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
|
|
return cb.Status_BAD_REQUEST, nil
|
|
}
|
|
|
|
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
|
|
|
|
cursor, number := chain.Reader().Iterator(seekInfo.Start)
|
|
defer cursor.Close()
|
|
var stopNum uint64
|
|
switch stop := seekInfo.Stop.Type.(type) {
|
|
case *ab.SeekPosition_Oldest:
|
|
stopNum = number
|
|
case *ab.SeekPosition_Newest:
|
|
// when seeking only the newest block (i.e. starting
|
|
// and stopping at newest), don't reevaluate the ledger
|
|
// height as this can lead to multiple blocks being
|
|
// sent when only one is expected
|
|
if proto.Equal(seekInfo.Start, seekInfo.Stop) {
|
|
stopNum = number
|
|
break
|
|
}
|
|
stopNum = chain.Reader().Height() - 1
|
|
case *ab.SeekPosition_Specified:
|
|
stopNum = stop.Specified.Number
|
|
if stopNum < number {
|
|
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
|
|
return cb.Status_BAD_REQUEST, nil
|
|
}
|
|
}
|
|
|
|
for {
|
|
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
|
|
if number > chain.Reader().Height()-1 {
|
|
logger.Warningf("[channel: %s] Block %d not found, block number greater than chain length bounds", chdr.ChannelId, number)
|
|
return cb.Status_NOT_FOUND, nil
|
|
}
|
|
}
|
|
|
|
var block *cb.Block
|
|
var status cb.Status
|
|
|
|
iterCh := make(chan struct{})
|
|
go func() {
|
|
block, status = cursor.Next()
|
|
close(iterCh)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Debugf("Context canceled, aborting wait for next block")
|
|
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
|
|
case <-erroredChan:
|
|
// TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports
|
|
// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
|
|
logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
|
|
return cb.Status_SERVICE_UNAVAILABLE, nil
|
|
case <-iterCh:
|
|
// Iterator has set the block and status vars
|
|
}
|
|
|
|
if status != cb.Status_SUCCESS {
|
|
logger.Warningf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
|
|
return status, nil
|
|
}
|
|
|
|
// increment block number to support FAIL_IF_NOT_READY deliver behavior
|
|
number++
|
|
|
|
if err := accessControl.Evaluate(); err != nil {
|
|
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
|
|
return cb.Status_FORBIDDEN, nil
|
|
}
|
|
|
|
logger.Debugf("[channel: %s] Delivering block [%d] for (%p) for %s", chdr.ChannelId, block.Header.Number, seekInfo, addr)
|
|
|
|
if seekInfo.ContentType == ab.SeekInfo_HEADER_WITH_SIG {
|
|
block.Data = nil
|
|
}
|
|
|
|
signedData := &protoutil.SignedData{Data: envelope.Payload, Identity: shdr.Creator, Signature: envelope.Signature}
|
|
if err := srv.SendBlockResponse(block, chdr.ChannelId, chain, signedData); err != nil {
|
|
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
|
|
return cb.Status_INTERNAL_SERVER_ERROR, err
|
|
}
|
|
|
|
h.Metrics.BlocksSent.With(labels...).Add(1)
|
|
|
|
if stopNum == block.Header.Number {
|
|
break
|
|
}
|
|
}
|
|
|
|
logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
|
|
|
|
return cb.Status_SUCCESS, nil
|
|
}
|
|
|
|
func (h *Handler) parseEnvelope(ctx context.Context, envelope *cb.Envelope) (*cb.Payload, *cb.ChannelHeader, *cb.SignatureHeader, error) {
|
|
payload, err := protoutil.UnmarshalPayload(envelope.Payload)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
if payload.Header == nil {
|
|
return nil, nil, nil, errors.New("envelope has no header")
|
|
}
|
|
|
|
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
shdr, err := protoutil.UnmarshalSignatureHeader(payload.Header.SignatureHeader)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
err = h.validateChannelHeader(ctx, chdr)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
return payload, chdr, shdr, nil
|
|
}
|
|
|
|
func (h *Handler) validateChannelHeader(ctx context.Context, chdr *cb.ChannelHeader) error {
|
|
if chdr.GetTimestamp() == nil {
|
|
err := errors.New("channel header in envelope must contain timestamp")
|
|
return err
|
|
}
|
|
|
|
envTime := time.Unix(chdr.GetTimestamp().Seconds, int64(chdr.GetTimestamp().Nanos)).UTC()
|
|
serverTime := time.Now()
|
|
|
|
if math.Abs(float64(serverTime.UnixNano()-envTime.UnixNano())) > float64(h.TimeWindow.Nanoseconds()) {
|
|
err := errors.Errorf("envelope timestamp %s is more than %s apart from current server time %s", envTime, h.TimeWindow, serverTime)
|
|
return err
|
|
}
|
|
|
|
err := h.BindingInspector.Inspect(ctx, chdr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func noExpiration(_ []byte) time.Time {
|
|
return time.Time{}
|
|
}
|
|
|
|
func (h *Handler) HandleAttestation(ctx context.Context, srv *Server, env *cb.Envelope) error {
|
|
status, err := h.deliverBlocks(ctx, srv, env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = srv.SendStatusResponse(status)
|
|
if status != cb.Status_SUCCESS {
|
|
return err
|
|
}
|
|
if err != nil {
|
|
addr := util.ExtractRemoteAddress(ctx)
|
|
logger.Warningf("Error sending to %s: %s", addr, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|