go_study/fabric-main/gossip/comm/conn.go

343 lines
8.9 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"context"
"sync"
proto "github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/protoext"
"github.com/hyperledger/fabric/gossip/util"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type handler func(message *protoext.SignedGossipMessage)
type blockingBehavior bool
const (
blockingSend = blockingBehavior(true)
nonBlockingSend = blockingBehavior(false)
)
type connFactory interface {
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
}
type connectionStore struct {
config ConnConfig
logger util.Logger // logger
isClosing bool // whether this connection store is shutting down
shutdownOnce sync.Once // ensure shutdown is only called once per connectionStore
connFactory connFactory // creates a connection to remote peer
sync.RWMutex // synchronize access to shared variables
pki2Conn map[string]*connection // mapping between pkiID to connections
destinationLocks map[string]*sync.Mutex // mapping between pkiIDs and locks,
// used to prevent concurrent connection establishment to the same remote endpoint
}
func newConnStore(connFactory connFactory, logger util.Logger, config ConnConfig) *connectionStore {
return &connectionStore{
connFactory: connFactory,
isClosing: false,
pki2Conn: make(map[string]*connection),
destinationLocks: make(map[string]*sync.Mutex),
logger: logger,
config: config,
}
}
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
cs.RLock()
isClosing := cs.isClosing
cs.RUnlock()
if isClosing {
return nil, errors.Errorf("conn store is closing")
}
pkiID := peer.PKIID
endpoint := peer.Endpoint
cs.Lock()
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
if !hasConnected {
destinationLock = &sync.Mutex{}
cs.destinationLocks[string(pkiID)] = destinationLock
}
cs.Unlock()
destinationLock.Lock()
cs.RLock()
conn, exists := cs.pki2Conn[string(pkiID)]
if exists {
cs.RUnlock()
destinationLock.Unlock()
return conn, nil
}
cs.RUnlock()
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
if err == nil {
cs.logger.Debugf("Created new connection to %s, %s", endpoint, pkiID)
}
destinationLock.Unlock()
cs.RLock()
isClosing = cs.isClosing
cs.RUnlock()
if isClosing {
return nil, errors.Errorf("conn store is closing")
}
cs.Lock()
delete(cs.destinationLocks, string(pkiID))
defer cs.Unlock()
// check again, maybe someone connected to us during the connection creation?
conn, exists = cs.pki2Conn[string(pkiID)]
if exists {
if createdConnection != nil {
createdConnection.close()
}
return conn, nil
}
// no one connected to us AND we failed connecting!
if err != nil {
return nil, err
}
// At this point we know the latest PKI-ID of the remote peer.
// Perhaps we are trying to connect to a peer that changed its PKI-ID.
// Ensure that we are not already connected to that PKI-ID,
// and if so - close the old connection in order to keep the latest one.
// We keep the latest one because the remote peer is going to close
// our old connection anyway once it sensed this connection handshake.
if conn, exists := cs.pki2Conn[string(createdConnection.pkiID)]; exists {
conn.close()
}
// at this point in the code, we created a connection to a remote peer
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn
go conn.serviceConnection()
return conn, nil
}
func (cs *connectionStore) connNum() int {
cs.RLock()
defer cs.RUnlock()
return len(cs.pki2Conn)
}
func (cs *connectionStore) shutdown() {
cs.shutdownOnce.Do(func() {
cs.Lock()
cs.isClosing = true
for _, conn := range cs.pki2Conn {
conn.close()
}
cs.pki2Conn = make(map[string]*connection)
cs.Unlock()
})
}
// onConnected closes any connection to the remote peer and creates a new connection object to it in order to have only
// one single bi-directional connection between a pair of peers
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer,
connInfo *protoext.ConnectionInfo, metrics *metrics.CommMetrics) *connection {
cs.Lock()
defer cs.Unlock()
if c, exists := cs.pki2Conn[string(connInfo.ID)]; exists {
c.close()
}
conn := newConnection(nil, nil, serverStream, metrics, cs.config)
conn.pkiID = connInfo.ID
conn.info = connInfo
conn.logger = cs.logger
cs.pki2Conn[string(connInfo.ID)] = conn
return conn
}
func (cs *connectionStore) closeConnByPKIid(pkiID common.PKIidType) {
cs.Lock()
defer cs.Unlock()
if conn, exists := cs.pki2Conn[string(pkiID)]; exists {
conn.close()
delete(cs.pki2Conn, string(pkiID))
}
}
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, s stream, metrics *metrics.CommMetrics, config ConnConfig) *connection {
connection := &connection{
metrics: metrics,
outBuff: make(chan *msgSending, config.SendBuffSize),
cl: cl,
conn: c,
gossipStream: s,
stopChan: make(chan struct{}, 1),
recvBuffSize: config.RecvBuffSize,
}
return connection
}
// ConnConfig is the configuration required to initialize a new conn
type ConnConfig struct {
RecvBuffSize int
SendBuffSize int
}
type connection struct {
recvBuffSize int
metrics *metrics.CommMetrics
cancel context.CancelFunc
info *protoext.ConnectionInfo
outBuff chan *msgSending
logger util.Logger // logger
pkiID common.PKIidType // pkiID of the remote endpoint
handler handler // function to invoke upon a message reception
conn *grpc.ClientConn // gRPC connection to remote endpoint
cl proto.GossipClient // gRPC stub of remote endpoint
gossipStream stream // there can only be one
stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
stopOnce sync.Once // once to ensure close is called only once
}
func (conn *connection) close() {
conn.stopOnce.Do(func() {
close(conn.stopChan)
if conn.conn != nil {
conn.conn.Close()
}
if conn.cancel != nil {
conn.cancel()
}
})
}
func (conn *connection) send(msg *protoext.SignedGossipMessage, onErr func(error), shouldBlock blockingBehavior) {
m := &msgSending{
envelope: msg.Envelope,
onErr: onErr,
}
select {
case conn.outBuff <- m:
// room in channel, successfully sent message, nothing to do
case <-conn.stopChan:
conn.logger.Debugf("Aborting send() to %s because connection is closing", conn.info.Endpoint)
default: // did not send
if shouldBlock {
select {
case conn.outBuff <- m: // try again, and wait to send
case <-conn.stopChan: // stop blocking if the connection is closing
}
} else {
conn.metrics.BufferOverflow.Add(1)
conn.logger.Debugf("Buffer to %s overflowed, dropping message %s", conn.info.Endpoint, msg)
}
}
}
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *protoext.SignedGossipMessage, conn.recvBuffSize)
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, msgChan)
go conn.writeToStream()
for {
select {
case <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
return nil
case err := <-errChan:
return err
case msg := <-msgChan:
conn.handler(msg)
}
}
}
func (conn *connection) writeToStream() {
stream := conn.gossipStream
for {
select {
case m := <-conn.outBuff:
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
return
}
conn.metrics.SentMessages.Add(1)
case <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
return
}
}
}
func (conn *connection) readFromStream(errChan chan error, msgChan chan *protoext.SignedGossipMessage) {
stream := conn.gossipStream
for {
select {
case <-conn.stopChan:
return
default:
envelope, err := stream.Recv()
if err != nil {
errChan <- err
conn.logger.Debugf("Got error, aborting: %v", err)
return
}
conn.metrics.ReceivedMessages.Add(1)
msg, err := protoext.EnvelopeToGossipMessage(envelope)
if err != nil {
errChan <- err
conn.logger.Warningf("Got error, aborting: %v", err)
return
}
select {
case <-conn.stopChan:
case msgChan <- msg:
}
}
}
}
type msgSending struct {
envelope *proto.Envelope
onErr func(error)
}
//go:generate mockery -dir . -name MockStream -case underscore -output mocks/
type MockStream interface {
proto.Gossip_GossipStreamClient
}