343 lines
8.9 KiB
Go
343 lines
8.9 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package comm
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
proto "github.com/hyperledger/fabric-protos-go/gossip"
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
"github.com/hyperledger/fabric/gossip/metrics"
|
|
"github.com/hyperledger/fabric/gossip/protoext"
|
|
"github.com/hyperledger/fabric/gossip/util"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type handler func(message *protoext.SignedGossipMessage)
|
|
|
|
type blockingBehavior bool
|
|
|
|
const (
|
|
blockingSend = blockingBehavior(true)
|
|
nonBlockingSend = blockingBehavior(false)
|
|
)
|
|
|
|
type connFactory interface {
|
|
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
|
|
}
|
|
|
|
type connectionStore struct {
|
|
config ConnConfig
|
|
logger util.Logger // logger
|
|
isClosing bool // whether this connection store is shutting down
|
|
shutdownOnce sync.Once // ensure shutdown is only called once per connectionStore
|
|
connFactory connFactory // creates a connection to remote peer
|
|
sync.RWMutex // synchronize access to shared variables
|
|
pki2Conn map[string]*connection // mapping between pkiID to connections
|
|
destinationLocks map[string]*sync.Mutex // mapping between pkiIDs and locks,
|
|
// used to prevent concurrent connection establishment to the same remote endpoint
|
|
}
|
|
|
|
func newConnStore(connFactory connFactory, logger util.Logger, config ConnConfig) *connectionStore {
|
|
return &connectionStore{
|
|
connFactory: connFactory,
|
|
isClosing: false,
|
|
pki2Conn: make(map[string]*connection),
|
|
destinationLocks: make(map[string]*sync.Mutex),
|
|
logger: logger,
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
|
|
cs.RLock()
|
|
isClosing := cs.isClosing
|
|
cs.RUnlock()
|
|
|
|
if isClosing {
|
|
return nil, errors.Errorf("conn store is closing")
|
|
}
|
|
|
|
pkiID := peer.PKIID
|
|
endpoint := peer.Endpoint
|
|
|
|
cs.Lock()
|
|
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
|
|
if !hasConnected {
|
|
destinationLock = &sync.Mutex{}
|
|
cs.destinationLocks[string(pkiID)] = destinationLock
|
|
}
|
|
cs.Unlock()
|
|
|
|
destinationLock.Lock()
|
|
|
|
cs.RLock()
|
|
conn, exists := cs.pki2Conn[string(pkiID)]
|
|
if exists {
|
|
cs.RUnlock()
|
|
destinationLock.Unlock()
|
|
return conn, nil
|
|
}
|
|
cs.RUnlock()
|
|
|
|
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
|
|
if err == nil {
|
|
cs.logger.Debugf("Created new connection to %s, %s", endpoint, pkiID)
|
|
}
|
|
|
|
destinationLock.Unlock()
|
|
|
|
cs.RLock()
|
|
isClosing = cs.isClosing
|
|
cs.RUnlock()
|
|
if isClosing {
|
|
return nil, errors.Errorf("conn store is closing")
|
|
}
|
|
|
|
cs.Lock()
|
|
delete(cs.destinationLocks, string(pkiID))
|
|
defer cs.Unlock()
|
|
|
|
// check again, maybe someone connected to us during the connection creation?
|
|
conn, exists = cs.pki2Conn[string(pkiID)]
|
|
|
|
if exists {
|
|
if createdConnection != nil {
|
|
createdConnection.close()
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
// no one connected to us AND we failed connecting!
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// At this point we know the latest PKI-ID of the remote peer.
|
|
// Perhaps we are trying to connect to a peer that changed its PKI-ID.
|
|
// Ensure that we are not already connected to that PKI-ID,
|
|
// and if so - close the old connection in order to keep the latest one.
|
|
// We keep the latest one because the remote peer is going to close
|
|
// our old connection anyway once it sensed this connection handshake.
|
|
if conn, exists := cs.pki2Conn[string(createdConnection.pkiID)]; exists {
|
|
conn.close()
|
|
}
|
|
|
|
// at this point in the code, we created a connection to a remote peer
|
|
conn = createdConnection
|
|
cs.pki2Conn[string(createdConnection.pkiID)] = conn
|
|
|
|
go conn.serviceConnection()
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func (cs *connectionStore) connNum() int {
|
|
cs.RLock()
|
|
defer cs.RUnlock()
|
|
return len(cs.pki2Conn)
|
|
}
|
|
|
|
func (cs *connectionStore) shutdown() {
|
|
cs.shutdownOnce.Do(func() {
|
|
cs.Lock()
|
|
cs.isClosing = true
|
|
|
|
for _, conn := range cs.pki2Conn {
|
|
conn.close()
|
|
}
|
|
cs.pki2Conn = make(map[string]*connection)
|
|
|
|
cs.Unlock()
|
|
})
|
|
}
|
|
|
|
// onConnected closes any connection to the remote peer and creates a new connection object to it in order to have only
|
|
// one single bi-directional connection between a pair of peers
|
|
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer,
|
|
connInfo *protoext.ConnectionInfo, metrics *metrics.CommMetrics) *connection {
|
|
cs.Lock()
|
|
defer cs.Unlock()
|
|
|
|
if c, exists := cs.pki2Conn[string(connInfo.ID)]; exists {
|
|
c.close()
|
|
}
|
|
|
|
conn := newConnection(nil, nil, serverStream, metrics, cs.config)
|
|
conn.pkiID = connInfo.ID
|
|
conn.info = connInfo
|
|
conn.logger = cs.logger
|
|
cs.pki2Conn[string(connInfo.ID)] = conn
|
|
return conn
|
|
}
|
|
|
|
func (cs *connectionStore) closeConnByPKIid(pkiID common.PKIidType) {
|
|
cs.Lock()
|
|
defer cs.Unlock()
|
|
if conn, exists := cs.pki2Conn[string(pkiID)]; exists {
|
|
conn.close()
|
|
delete(cs.pki2Conn, string(pkiID))
|
|
}
|
|
}
|
|
|
|
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, s stream, metrics *metrics.CommMetrics, config ConnConfig) *connection {
|
|
connection := &connection{
|
|
metrics: metrics,
|
|
outBuff: make(chan *msgSending, config.SendBuffSize),
|
|
cl: cl,
|
|
conn: c,
|
|
gossipStream: s,
|
|
stopChan: make(chan struct{}, 1),
|
|
recvBuffSize: config.RecvBuffSize,
|
|
}
|
|
return connection
|
|
}
|
|
|
|
// ConnConfig is the configuration required to initialize a new conn
|
|
type ConnConfig struct {
|
|
RecvBuffSize int
|
|
SendBuffSize int
|
|
}
|
|
|
|
type connection struct {
|
|
recvBuffSize int
|
|
metrics *metrics.CommMetrics
|
|
cancel context.CancelFunc
|
|
info *protoext.ConnectionInfo
|
|
outBuff chan *msgSending
|
|
logger util.Logger // logger
|
|
pkiID common.PKIidType // pkiID of the remote endpoint
|
|
handler handler // function to invoke upon a message reception
|
|
conn *grpc.ClientConn // gRPC connection to remote endpoint
|
|
cl proto.GossipClient // gRPC stub of remote endpoint
|
|
gossipStream stream // there can only be one
|
|
stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
|
|
stopOnce sync.Once // once to ensure close is called only once
|
|
}
|
|
|
|
func (conn *connection) close() {
|
|
conn.stopOnce.Do(func() {
|
|
close(conn.stopChan)
|
|
|
|
if conn.conn != nil {
|
|
conn.conn.Close()
|
|
}
|
|
|
|
if conn.cancel != nil {
|
|
conn.cancel()
|
|
}
|
|
})
|
|
}
|
|
|
|
func (conn *connection) send(msg *protoext.SignedGossipMessage, onErr func(error), shouldBlock blockingBehavior) {
|
|
m := &msgSending{
|
|
envelope: msg.Envelope,
|
|
onErr: onErr,
|
|
}
|
|
|
|
select {
|
|
case conn.outBuff <- m:
|
|
// room in channel, successfully sent message, nothing to do
|
|
case <-conn.stopChan:
|
|
conn.logger.Debugf("Aborting send() to %s because connection is closing", conn.info.Endpoint)
|
|
default: // did not send
|
|
if shouldBlock {
|
|
select {
|
|
case conn.outBuff <- m: // try again, and wait to send
|
|
case <-conn.stopChan: // stop blocking if the connection is closing
|
|
}
|
|
} else {
|
|
conn.metrics.BufferOverflow.Add(1)
|
|
conn.logger.Debugf("Buffer to %s overflowed, dropping message %s", conn.info.Endpoint, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (conn *connection) serviceConnection() error {
|
|
errChan := make(chan error, 1)
|
|
msgChan := make(chan *protoext.SignedGossipMessage, conn.recvBuffSize)
|
|
// Call stream.Recv() asynchronously in readFromStream(),
|
|
// and wait for either the Recv() call to end,
|
|
// or a signal to close the connection, which exits
|
|
// the method and makes the Recv() call to fail in the
|
|
// readFromStream() method
|
|
go conn.readFromStream(errChan, msgChan)
|
|
|
|
go conn.writeToStream()
|
|
|
|
for {
|
|
select {
|
|
case <-conn.stopChan:
|
|
conn.logger.Debug("Closing reading from stream")
|
|
return nil
|
|
case err := <-errChan:
|
|
return err
|
|
case msg := <-msgChan:
|
|
conn.handler(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (conn *connection) writeToStream() {
|
|
stream := conn.gossipStream
|
|
for {
|
|
select {
|
|
case m := <-conn.outBuff:
|
|
err := stream.Send(m.envelope)
|
|
if err != nil {
|
|
go m.onErr(err)
|
|
return
|
|
}
|
|
conn.metrics.SentMessages.Add(1)
|
|
case <-conn.stopChan:
|
|
conn.logger.Debug("Closing writing to stream")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (conn *connection) readFromStream(errChan chan error, msgChan chan *protoext.SignedGossipMessage) {
|
|
stream := conn.gossipStream
|
|
for {
|
|
select {
|
|
case <-conn.stopChan:
|
|
return
|
|
default:
|
|
envelope, err := stream.Recv()
|
|
if err != nil {
|
|
errChan <- err
|
|
conn.logger.Debugf("Got error, aborting: %v", err)
|
|
return
|
|
}
|
|
conn.metrics.ReceivedMessages.Add(1)
|
|
msg, err := protoext.EnvelopeToGossipMessage(envelope)
|
|
if err != nil {
|
|
errChan <- err
|
|
conn.logger.Warningf("Got error, aborting: %v", err)
|
|
return
|
|
}
|
|
select {
|
|
case <-conn.stopChan:
|
|
case msgChan <- msg:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type msgSending struct {
|
|
envelope *proto.Envelope
|
|
onErr func(error)
|
|
}
|
|
|
|
//go:generate mockery -dir . -name MockStream -case underscore -output mocks/
|
|
|
|
type MockStream interface {
|
|
proto.Gossip_GossipStreamClient
|
|
}
|