go_study/fabric-main/orderer/common/cluster/comm.go

409 lines
12 KiB
Go

/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster
import (
"bytes"
"context"
"crypto/x509"
"encoding/pem"
"sync"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
const (
// MinimumExpirationWarningInterval is the default minimum time interval
// between consecutive warnings about certificate expiration.
MinimumExpirationWarningInterval = time.Minute * 5
)
var (
errOverflow = errors.New("send queue overflown")
errAborted = errors.New("aborted")
errTimeout = errors.New("rpc timeout expired")
)
// MembersByChannel is a mapping from channel name
// to MemberMapping
type MembersByChannel map[string]MemberMapping
// Comm implements Communicator
type Comm struct {
MinimumExpirationWarningInterval time.Duration
CertExpWarningThreshold time.Duration
shutdownSignal chan struct{}
shutdown bool
SendBufferSize int
Lock sync.RWMutex
Logger *flogging.FabricLogger
ChanExt ChannelExtractor
H Handler
Connections *ConnectionStore
Chan2Members MembersByChannel
Metrics *Metrics
CompareCertificate CertificateComparator
}
type requestContext struct {
channel string
sender uint64
}
// DispatchSubmit identifies the channel and sender of the submit request and passes it
// to the underlying Handler
func (c *Comm) DispatchSubmit(ctx context.Context, request *orderer.SubmitRequest) error {
reqCtx, err := c.requestContext(ctx, request)
if err != nil {
return err
}
return c.H.OnSubmit(reqCtx.channel, reqCtx.sender, request)
}
// DispatchConsensus identifies the channel and sender of the step request and passes it
// to the underlying Handler
func (c *Comm) DispatchConsensus(ctx context.Context, request *orderer.ConsensusRequest) error {
reqCtx, err := c.requestContext(ctx, request)
if err != nil {
return err
}
return c.H.OnConsensus(reqCtx.channel, reqCtx.sender, request)
}
// requestContext identifies the sender and channel of the request and returns
// it wrapped in a requestContext
func (c *Comm) requestContext(ctx context.Context, msg proto.Message) (*requestContext, error) {
channel := c.ChanExt.TargetChannel(msg)
if channel == "" {
return nil, errors.Errorf("badly formatted message, cannot extract channel")
}
c.Lock.RLock()
mapping, exists := c.Chan2Members[channel]
c.Lock.RUnlock()
if !exists {
return nil, errors.Errorf("channel %s doesn't exist", channel)
}
cert := util.ExtractRawCertificateFromContext(ctx)
if len(cert) == 0 {
return nil, errors.Errorf("no TLS certificate sent")
}
stub := mapping.LookupByClientCert(cert)
if stub == nil {
return nil, errors.Errorf("certificate extracted from TLS connection isn't authorized")
}
return &requestContext{
channel: channel,
sender: stub.ID,
}, nil
}
// Remote obtains a RemoteContext linked to the destination node on the context
// of a given channel
func (c *Comm) Remote(channel string, id uint64) (*RemoteContext, error) {
c.Lock.RLock()
defer c.Lock.RUnlock()
if c.shutdown {
return nil, errors.New("communication has been shut down")
}
mapping, exists := c.Chan2Members[channel]
if !exists {
return nil, errors.Errorf("channel %s doesn't exist", channel)
}
stub := mapping.ByID(id)
if stub == nil {
return nil, errors.Errorf("node %d doesn't exist in channel %s's membership", id, channel)
}
if stub.Active() {
return stub.RemoteContext, nil
}
err := stub.Activate(c.createRemoteContext(stub, channel))
if err != nil {
return nil, errors.WithStack(err)
}
return stub.RemoteContext, nil
}
// Configure configures the channel with the given RemoteNodes
func (c *Comm) Configure(channel string, newNodes []RemoteNode) {
c.Logger.Infof("Entering, channel: %s, nodes: %v", channel, newNodes)
defer c.Logger.Infof("Exiting")
c.Lock.Lock()
defer c.Lock.Unlock()
c.createShutdownSignalIfNeeded()
if c.shutdown {
return
}
beforeConfigChange := c.serverCertsInUse()
// Update the channel-scoped mapping with the new nodes
c.applyMembershipConfig(channel, newNodes)
// Close connections to nodes that are not present in the new membership
c.cleanUnusedConnections(beforeConfigChange)
}
func (c *Comm) createShutdownSignalIfNeeded() {
if c.shutdownSignal == nil {
c.shutdownSignal = make(chan struct{})
}
}
// Shutdown shuts down the instance
func (c *Comm) Shutdown() {
c.Lock.Lock()
defer c.Lock.Unlock()
c.createShutdownSignalIfNeeded()
if !c.shutdown {
close(c.shutdownSignal)
}
c.shutdown = true
for _, members := range c.Chan2Members {
members.Foreach(func(id uint64, stub *Stub) {
c.Connections.Disconnect(stub.ServerTLSCert)
})
}
}
// cleanUnusedConnections disconnects all connections that are un-used
// at the moment of the invocation
func (c *Comm) cleanUnusedConnections(serverCertsBeforeConfig StringSet) {
// Scan all nodes after the reconfiguration
serverCertsAfterConfig := c.serverCertsInUse()
// Filter out the certificates that remained after the reconfiguration
serverCertsBeforeConfig.subtract(serverCertsAfterConfig)
// Close the connections to all these nodes as they shouldn't be in use now
for serverCertificate := range serverCertsBeforeConfig {
c.Connections.Disconnect([]byte(serverCertificate))
}
}
// serverCertsInUse returns the server certificates that are in use
// represented as strings.
func (c *Comm) serverCertsInUse() StringSet {
endpointsInUse := make(StringSet)
for _, mapping := range c.Chan2Members {
endpointsInUse.union(mapping.ServerCertificates())
}
return endpointsInUse
}
// applyMembershipConfig sets the given RemoteNodes for the given channel
func (c *Comm) applyMembershipConfig(channel string, newNodes []RemoteNode) {
mapping := c.getOrCreateMapping(channel)
newNodeIDs := make(map[uint64]struct{})
for _, node := range newNodes {
newNodeIDs[node.ID] = struct{}{}
c.updateStubInMapping(channel, mapping, node)
}
// Remove all stubs without a corresponding node
// in the new nodes
mapping.Foreach(func(id uint64, stub *Stub) {
if _, exists := newNodeIDs[id]; exists {
c.Logger.Info(id, "exists in both old and new membership for channel", channel, ", skipping its deactivation")
return
}
c.Logger.Info("Deactivated node", id, "who's endpoint is", stub.Endpoint, "as it's removed from membership")
mapping.Remove(id)
stub.Deactivate()
})
}
// updateStubInMapping updates the given RemoteNode and adds it to the MemberMapping
func (c *Comm) updateStubInMapping(channel string, mapping MemberMapping, node RemoteNode) {
stub := mapping.ByID(node.ID)
if stub == nil {
c.Logger.Info("Allocating a new stub for node", node.ID, "with endpoint of", node.Endpoint, "for channel", channel)
stub = &Stub{}
}
// Check if the TLS server certificate of the node is replaced
// and if so - then deactivate the stub, to trigger
// a re-creation of its gRPC connection
if !bytes.Equal(stub.ServerTLSCert, node.ServerTLSCert) {
c.Logger.Info("Deactivating node", node.ID, "in channel", channel,
"with endpoint of", node.Endpoint, "due to TLS certificate change")
stub.Deactivate()
}
// Overwrite the stub Node data with the new data
stub.RemoteNode = node
// Put the stub into the mapping
mapping.Put(stub)
// Check if the stub needs activation.
if stub.Active() {
return
}
// Activate the stub
stub.Activate(c.createRemoteContext(stub, channel))
}
// createRemoteStub returns a function that creates a RemoteContext.
// It is used as a parameter to Stub.Activate() in order to activate
// a stub atomically.
func (c *Comm) createRemoteContext(stub *Stub, channel string) func() (*RemoteContext, error) {
return func() (*RemoteContext, error) {
cert, err := x509.ParseCertificate(stub.ServerTLSCert)
if err != nil {
pemString := string(pem.EncodeToMemory(&pem.Block{Bytes: stub.ServerTLSCert}))
c.Logger.Errorf("Invalid DER for channel %s, endpoint %s, ID %d: %v", channel, stub.Endpoint, stub.ID, pemString)
return nil, errors.Wrap(err, "invalid certificate DER")
}
c.Logger.Debug("Connecting to", stub.RemoteNode, "for channel", channel)
conn, err := c.Connections.Connection(stub.Endpoint, stub.ServerTLSCert)
if err != nil {
c.Logger.Warningf("Unable to obtain connection to %d(%s) (channel %s): %v", stub.ID, stub.Endpoint, channel, err)
return nil, err
}
probeConnection := func(conn *grpc.ClientConn) error {
connState := conn.GetState()
if connState == connectivity.Connecting {
return errors.Errorf("connection to %d(%s) is in state %s", stub.ID, stub.Endpoint, connState)
}
return nil
}
clusterClient := orderer.NewClusterClient(conn)
getStream := func(ctx context.Context) (StepClientStream, error) {
stream, err := clusterClient.Step(ctx)
if err != nil {
return nil, err
}
stepClientStream := &CommClientStream{
StepClient: stream,
}
return stepClientStream, nil
}
workerCountReporter := workerCountReporter{
channel: channel,
}
rc := &RemoteContext{
expiresAt: cert.NotAfter,
minimumExpirationWarningInterval: c.MinimumExpirationWarningInterval,
certExpWarningThreshold: c.CertExpWarningThreshold,
workerCountReporter: workerCountReporter,
Channel: channel,
Metrics: c.Metrics,
SendBuffSize: c.SendBufferSize,
shutdownSignal: c.shutdownSignal,
endpoint: stub.Endpoint,
Logger: c.Logger,
ProbeConn: probeConnection,
conn: conn,
GetStreamFunc: getStream,
}
return rc, nil
}
}
// getOrCreateMapping creates a MemberMapping for the given channel
// or returns the existing one.
func (c *Comm) getOrCreateMapping(channel string) MemberMapping {
// Lazily create a mapping if it doesn't already exist
mapping, exists := c.Chan2Members[channel]
if !exists {
mapping = MemberMapping{
id2stub: make(map[uint64]*Stub),
SamePublicKey: c.CompareCertificate,
}
c.Chan2Members[channel] = mapping
}
return mapping
}
func commonNameFromContext(ctx context.Context) string {
cert := util.ExtractCertificateFromContext(ctx)
if cert == nil {
return "unidentified node"
}
return cert.Subject.CommonName
}
type streamsMapperReporter struct {
size uint32
sync.Map
}
func (smr *streamsMapperReporter) Delete(key interface{}) {
smr.Map.Delete(key)
atomic.AddUint32(&smr.size, ^uint32(0))
}
func (smr *streamsMapperReporter) Store(key, value interface{}) {
smr.Map.Store(key, value)
atomic.AddUint32(&smr.size, 1)
}
type workerCountReporter struct {
channel string
workerCount uint32
}
func (wcr *workerCountReporter) increment(m *Metrics) {
count := atomic.AddUint32(&wcr.workerCount, 1)
m.reportWorkerCount(wcr.channel, count)
}
func (wcr *workerCountReporter) decrement(m *Metrics) {
// ^0 flips all zeros to ones, which means
// 2^32 - 1, and then we add this number wcr.workerCount.
// It follows from commutativity of the unsigned integers group
// that wcr.workerCount + 2^32 - 1 = wcr.workerCount - 1 + 2^32
// which is just wcr.workerCount - 1.
count := atomic.AddUint32(&wcr.workerCount, ^uint32(0))
m.reportWorkerCount(wcr.channel, count)
}
type CommClientStream struct {
StepClient orderer.Cluster_StepClient
}
func (cs *CommClientStream) Send(request *orderer.StepRequest) error {
return cs.StepClient.Send(request)
}
func (cs *CommClientStream) Recv() (*orderer.StepResponse, error) {
return cs.StepClient.Recv()
}
func (cs *CommClientStream) Auth() error {
return nil
}
func (cs *CommClientStream) Context() context.Context {
return cs.StepClient.Context()
}