212 lines
5.8 KiB
Go
212 lines
5.8 KiB
Go
/*
|
|
Copyright IBM Corp. 2017 All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hyperledger/fabric-protos-go/orderer"
|
|
"github.com/hyperledger/fabric/common/flogging"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
//go:generate mockery --dir . --name StepClient --case underscore --output ./mocks/
|
|
|
|
// StepClient defines a client that sends and receives Step requests and responses.
|
|
type StepClient interface {
|
|
Send(*orderer.StepRequest) error
|
|
Recv() (*orderer.StepResponse, error)
|
|
grpc.ClientStream
|
|
}
|
|
|
|
//go:generate mockery --dir . --name ClusterClient --case underscore --output ./mocks/
|
|
|
|
// ClusterClient creates streams that point to a remote cluster member.
|
|
type ClusterClient interface {
|
|
Step(ctx context.Context, opts ...grpc.CallOption) (orderer.Cluster_StepClient, error)
|
|
}
|
|
|
|
// RPC performs remote procedure calls to remote cluster nodes.
|
|
type RPC struct {
|
|
consensusLock sync.Mutex
|
|
submitLock sync.Mutex
|
|
Logger *flogging.FabricLogger
|
|
Timeout time.Duration
|
|
Channel string
|
|
Comm Communicator
|
|
lock sync.RWMutex
|
|
StreamsByType map[OperationType]map[uint64]*Stream
|
|
}
|
|
|
|
// NewStreamsByType returns a mapping of operation type to
|
|
// a mapping of destination to stream.
|
|
func NewStreamsByType() map[OperationType]map[uint64]*Stream {
|
|
m := make(map[OperationType]map[uint64]*Stream)
|
|
m[ConsensusOperation] = make(map[uint64]*Stream)
|
|
m[SubmitOperation] = make(map[uint64]*Stream)
|
|
return m
|
|
}
|
|
|
|
// OperationType denotes a type of operation that the RPC can perform
|
|
// such as sending a transaction, or a consensus related message.
|
|
type OperationType int
|
|
|
|
const (
|
|
ConsensusOperation OperationType = iota
|
|
SubmitOperation
|
|
)
|
|
|
|
func (ot OperationType) String() string {
|
|
if ot == SubmitOperation {
|
|
return "transaction"
|
|
}
|
|
|
|
return "consensus"
|
|
}
|
|
|
|
// SendConsensus passes the given ConsensusRequest message to the raft.Node instance.
|
|
func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) error {
|
|
if s.Logger.IsEnabledFor(zapcore.DebugLevel) {
|
|
defer s.consensusSent(time.Now(), destination, msg)
|
|
}
|
|
|
|
stream, err := s.getOrCreateStream(destination, ConsensusOperation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &orderer.StepRequest{
|
|
Payload: &orderer.StepRequest_ConsensusRequest{
|
|
ConsensusRequest: msg,
|
|
},
|
|
}
|
|
|
|
s.consensusLock.Lock()
|
|
defer s.consensusLock.Unlock()
|
|
|
|
err = stream.Send(req)
|
|
if err != nil {
|
|
s.unMapStream(destination, ConsensusOperation, stream.ID)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// SendSubmit sends a SubmitRequest to the given destination node.
|
|
func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest, report func(error)) error {
|
|
if s.Logger.IsEnabledFor(zapcore.DebugLevel) {
|
|
defer s.submitSent(time.Now(), destination, request)
|
|
}
|
|
|
|
stream, err := s.getOrCreateStream(destination, SubmitOperation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &orderer.StepRequest{
|
|
Payload: &orderer.StepRequest_SubmitRequest{
|
|
SubmitRequest: request,
|
|
},
|
|
}
|
|
|
|
unmapOnFailure := func(err error) {
|
|
if err != nil && err.Error() == io.EOF.Error() {
|
|
s.Logger.Infof("Un-mapping transaction stream to %d because encountered a stale stream", destination)
|
|
s.unMapStream(destination, SubmitOperation, stream.ID)
|
|
}
|
|
report(err)
|
|
}
|
|
|
|
s.submitLock.Lock()
|
|
defer s.submitLock.Unlock()
|
|
|
|
err = stream.SendWithReport(req, unmapOnFailure)
|
|
if err != nil {
|
|
s.unMapStream(destination, SubmitOperation, stream.ID)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *RPC) submitSent(start time.Time, to uint64, msg *orderer.SubmitRequest) {
|
|
s.Logger.Debugf("Sending msg of %d bytes to %d on channel %s took %v", submitMsgLength(msg), to, s.Channel, time.Since(start))
|
|
}
|
|
|
|
func (s *RPC) consensusSent(start time.Time, to uint64, msg *orderer.ConsensusRequest) {
|
|
s.Logger.Debugf("Sending msg of %d bytes to %d on channel %s took %v", len(msg.Payload), to, s.Channel, time.Since(start))
|
|
}
|
|
|
|
// getOrCreateStream obtains a Submit stream for the given destination node
|
|
func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType) (*Stream, error) {
|
|
stream := s.getStream(destination, operationType)
|
|
if stream != nil {
|
|
return stream, nil
|
|
}
|
|
stub, err := s.Comm.Remote(s.Channel, destination)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
stream, err = stub.NewStream(s.Timeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.mapStream(destination, stream, operationType)
|
|
return stream, nil
|
|
}
|
|
|
|
func (s *RPC) getStream(destination uint64, operationType OperationType) *Stream {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.StreamsByType[operationType][destination]
|
|
}
|
|
|
|
func (s *RPC) mapStream(destination uint64, stream *Stream, operationType OperationType) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
s.StreamsByType[operationType][destination] = stream
|
|
s.cleanCanceledStreams(operationType)
|
|
}
|
|
|
|
func (s *RPC) unMapStream(destination uint64, operationType OperationType, streamIDToUnmap uint64) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
stream, exists := s.StreamsByType[operationType][destination]
|
|
if !exists {
|
|
s.Logger.Debugf("No %s stream to %d found, nothing to unmap", operationType, destination)
|
|
return
|
|
}
|
|
|
|
if stream.ID != streamIDToUnmap {
|
|
s.Logger.Debugf("Stream for %s to %d has an ID of %d, not %d", operationType, destination, stream.ID, streamIDToUnmap)
|
|
return
|
|
}
|
|
|
|
delete(s.StreamsByType[operationType], destination)
|
|
}
|
|
|
|
func (s *RPC) cleanCanceledStreams(operationType OperationType) {
|
|
for destination, stream := range s.StreamsByType[operationType] {
|
|
if !stream.Canceled() {
|
|
continue
|
|
}
|
|
s.Logger.Infof("Removing stream %d to %d for channel %s because it is canceled", stream.ID, destination, s.Channel)
|
|
delete(s.StreamsByType[operationType], destination)
|
|
}
|
|
}
|
|
|
|
func submitMsgLength(request *orderer.SubmitRequest) int {
|
|
if request.Payload == nil {
|
|
return 0
|
|
}
|
|
return len(request.Payload.Payload)
|
|
}
|