go_study/fabric-main/orderer/common/cluster/commauth.go

350 lines
9.5 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster
import (
"context"
"encoding/asn1"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
// AuthCommMgr implements the Communicator
// It manages the client side connections and streams established with
// the Cluster GRPC server and new Cluster service
type AuthCommMgr struct {
Logger *flogging.FabricLogger
Metrics *Metrics
Lock sync.RWMutex
shutdown bool
shutdownSignal chan struct{}
Chan2Members MembersByChannel
Connections *ConnectionsMgr
SendBufferSize int
NodeIdentity []byte
Signer identity.Signer
}
func (ac *AuthCommMgr) Remote(channel string, id uint64) (*RemoteContext, error) {
ac.Lock.RLock()
defer ac.Lock.RUnlock()
if ac.shutdown {
return nil, errors.New("communication has been shut down")
}
mapping, exists := ac.Chan2Members[channel]
if !exists {
return nil, errors.Errorf("channel %s doesn't exist", channel)
}
stub := mapping.ByID(id)
if stub == nil {
return nil, errors.Errorf("node %d doesn't exist in channel %s's membership", id, channel)
}
if stub.Active() {
return stub.RemoteContext, nil
}
err := stub.Activate(ac.createRemoteContext(stub, channel))
if err != nil {
return nil, errors.WithStack(err)
}
return stub.RemoteContext, nil
}
func (ac *AuthCommMgr) Configure(channel string, members []RemoteNode) {
ac.Logger.Infof("Configuring communication module for Channel: %s with nodes:%v", channel, members)
ac.Lock.Lock()
defer ac.Lock.Unlock()
if ac.shutdown {
return
}
if ac.shutdownSignal == nil {
ac.shutdownSignal = make(chan struct{})
}
mapping := ac.getOrCreateMapping(channel)
newNodeIDs := make(map[uint64]struct{})
for _, node := range members {
newNodeIDs[node.ID] = struct{}{}
ac.updateStubInMapping(channel, mapping, node)
}
// Remove all stubs without a corresponding node
// in the new nodes
mapping.Foreach(func(id uint64, stub *Stub) {
if _, exists := newNodeIDs[id]; exists {
ac.Logger.Infof("Node with ID %v exists in new membership of channel %v", id, channel)
return
}
ac.Logger.Infof("Deactivated node %v who's endpoint is %v", id, stub.Endpoint)
mapping.Remove(id)
stub.Deactivate()
ac.Connections.Disconnect(stub.Endpoint)
})
}
func (ac *AuthCommMgr) Shutdown() {
ac.Lock.Lock()
defer ac.Lock.Unlock()
if !ac.shutdown && ac.shutdownSignal != nil {
close(ac.shutdownSignal)
}
ac.shutdown = true
for _, members := range ac.Chan2Members {
members.Foreach(func(id uint64, stub *Stub) {
ac.Connections.Disconnect(stub.endpoint)
})
}
}
// getOrCreateMapping creates a MemberMapping for the given channel
// or returns the existing one.
func (ac *AuthCommMgr) getOrCreateMapping(channel string) MemberMapping {
// Lazily create a mapping if it doesn't already exist
mapping, exists := ac.Chan2Members[channel]
if !exists {
mapping = MemberMapping{
id2stub: make(map[uint64]*Stub),
}
ac.Chan2Members[channel] = mapping
}
return mapping
}
// updateStubInMapping updates the given RemoteNode and adds it to the MemberMapping
func (ac *AuthCommMgr) updateStubInMapping(channel string, mapping MemberMapping, node RemoteNode) {
stub := mapping.ByID(node.ID)
if stub == nil {
ac.Logger.Infof("Allocating a new stub for node %v with endpoint %v for channel %s", node.ID, node.Endpoint, channel)
stub = &Stub{}
}
// Overwrite the stub Node data with the new data
stub.RemoteNode = node
// Put the stub into the mapping
mapping.Put(stub)
// Check if the stub needs activation.
if stub.Active() {
return
}
// Activate the stub
stub.Activate(ac.createRemoteContext(stub, channel))
}
func (ac *AuthCommMgr) createRemoteContext(stub *Stub, channel string) func() (*RemoteContext, error) {
return func() (*RemoteContext, error) {
ac.Logger.Debugf("Connecting to node: %v for channel: %v", stub.RemoteNode.NodeAddress, channel)
conn, err := ac.Connections.Connect(stub.Endpoint, stub.RemoteNode.ServerRootCA)
if err != nil {
ac.Logger.Warningf("Unable to obtain connection to %d(%s) (channel %s): %v", stub.ID, stub.Endpoint, channel, err)
return nil, err
}
probeConnection := func(conn *grpc.ClientConn) error {
connState := conn.GetState()
if connState == connectivity.Connecting {
return errors.Errorf("connection to %d(%s) is in state %s", stub.ID, stub.Endpoint, connState)
}
return nil
}
clusterClient := orderer.NewClusterNodeServiceClient(conn)
getStepClientStream := func(ctx context.Context) (StepClientStream, error) {
stream, err := clusterClient.Step(ctx)
if err != nil {
return nil, err
}
membersMapping, exists := ac.Chan2Members[channel]
if !exists {
return nil, errors.Errorf("channel members not initialized")
}
nodeStub := membersMapping.LookupByIdentity(ac.NodeIdentity)
if nodeStub == nil {
return nil, errors.Errorf("node identity is missing in channel")
}
stepClientStream := &NodeClientStream{
Version: 0,
StepClient: stream,
SourceNodeID: nodeStub.ID,
DestinationNodeID: stub.ID,
Signer: ac.Signer,
Channel: channel,
}
return stepClientStream, nil
}
workerCountReporter := workerCountReporter{
channel: channel,
}
rc := &RemoteContext{
Metrics: ac.Metrics,
workerCountReporter: workerCountReporter,
Channel: channel,
SendBuffSize: ac.SendBufferSize,
endpoint: stub.Endpoint,
Logger: ac.Logger,
ProbeConn: probeConnection,
conn: conn,
GetStreamFunc: getStepClientStream,
shutdownSignal: ac.shutdownSignal,
}
return rc, nil
}
}
type NodeClientStream struct {
StepClient orderer.ClusterNodeService_StepClient
Version uint32
SourceNodeID uint64
DestinationNodeID uint64
Signer identity.Signer
Channel string
}
func (cs *NodeClientStream) Send(request *orderer.StepRequest) error {
stepRequest, cerr := BuildStepRequest(request)
if cerr != nil {
return cerr
}
return cs.StepClient.Send(stepRequest)
}
func (cs *NodeClientStream) Recv() (*orderer.StepResponse, error) {
nodeResponse, err := cs.StepClient.Recv()
if err != nil {
return nil, err
}
return BuildStepRespone(nodeResponse)
}
func (cs *NodeClientStream) Auth() error {
if cs.Signer == nil {
return errors.New("signer is nil")
}
timestamp, err := ptypes.TimestampProto(time.Now().UTC())
if err != nil {
return errors.Wrap(err, "failed to read timestamp")
}
payload := &orderer.NodeAuthRequest{
Version: cs.Version,
Timestamp: timestamp,
FromId: cs.SourceNodeID,
ToId: cs.DestinationNodeID,
Channel: cs.Channel,
}
bindingFieldsHash := GetSessionBindingHash(payload)
tlsBinding, err := GetTLSSessionBinding(cs.StepClient.Context(), bindingFieldsHash)
if err != nil {
return errors.Wrap(err, "TLSBinding failed")
}
payload.SessionBinding = tlsBinding
asnSignFields, _ := asn1.Marshal(AuthRequestSignature{
Version: int64(payload.Version),
Timestamp: payload.Timestamp.String(),
FromId: strconv.FormatUint(payload.FromId, 10),
ToId: strconv.FormatUint(payload.ToId, 10),
SessionBinding: payload.SessionBinding,
Channel: payload.Channel,
})
sig, err := cs.Signer.Sign(asnSignFields)
if err != nil {
return errors.Wrap(err, "signing failed")
}
payload.Signature = sig
stepRequest := &orderer.ClusterNodeServiceStepRequest{
Payload: &orderer.ClusterNodeServiceStepRequest_NodeAuthrequest{
NodeAuthrequest: payload,
},
}
return cs.StepClient.Send(stepRequest)
}
func (cs *NodeClientStream) Context() context.Context {
return cs.StepClient.Context()
}
func BuildStepRequest(request *orderer.StepRequest) (*orderer.ClusterNodeServiceStepRequest, error) {
if request == nil {
return nil, errors.New("request is nil")
}
var stepRequest *orderer.ClusterNodeServiceStepRequest
if consReq := request.GetConsensusRequest(); consReq != nil {
stepRequest = &orderer.ClusterNodeServiceStepRequest{
Payload: &orderer.ClusterNodeServiceStepRequest_NodeConrequest{
NodeConrequest: &orderer.NodeConsensusRequest{
Payload: consReq.Payload,
Metadata: consReq.Metadata,
},
},
}
return stepRequest, nil
} else if subReq := request.GetSubmitRequest(); subReq != nil {
stepRequest = &orderer.ClusterNodeServiceStepRequest{
Payload: &orderer.ClusterNodeServiceStepRequest_NodeTranrequest{
NodeTranrequest: &orderer.NodeTransactionOrderRequest{
Payload: subReq.Payload,
LastValidationSeq: subReq.LastValidationSeq,
},
},
}
return stepRequest, nil
}
return nil, errors.New("service message type not valid")
}
func BuildStepRespone(stepResponse *orderer.ClusterNodeServiceStepResponse) (*orderer.StepResponse, error) {
if stepResponse == nil {
return nil, errors.New("input response object is nil")
}
if respPayload := stepResponse.GetTranorderRes(); respPayload != nil {
stepResponse := &orderer.StepResponse{
Payload: &orderer.StepResponse_SubmitRes{
SubmitRes: &orderer.SubmitResponse{
Channel: respPayload.Channel,
Status: respPayload.Status,
},
},
}
return stepResponse, nil
}
return nil, errors.New("service stream returned with invalid response type")
}