/* Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ package comm import ( "context" "sync" proto "github.com/hyperledger/fabric-protos-go/gossip" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/metrics" "github.com/hyperledger/fabric/gossip/protoext" "github.com/hyperledger/fabric/gossip/util" "github.com/pkg/errors" "google.golang.org/grpc" ) type handler func(message *protoext.SignedGossipMessage) type blockingBehavior bool const ( blockingSend = blockingBehavior(true) nonBlockingSend = blockingBehavior(false) ) type connFactory interface { createConnection(endpoint string, pkiID common.PKIidType) (*connection, error) } type connectionStore struct { config ConnConfig logger util.Logger // logger isClosing bool // whether this connection store is shutting down shutdownOnce sync.Once // ensure shutdown is only called once per connectionStore connFactory connFactory // creates a connection to remote peer sync.RWMutex // synchronize access to shared variables pki2Conn map[string]*connection // mapping between pkiID to connections destinationLocks map[string]*sync.Mutex // mapping between pkiIDs and locks, // used to prevent concurrent connection establishment to the same remote endpoint } func newConnStore(connFactory connFactory, logger util.Logger, config ConnConfig) *connectionStore { return &connectionStore{ connFactory: connFactory, isClosing: false, pki2Conn: make(map[string]*connection), destinationLocks: make(map[string]*sync.Mutex), logger: logger, config: config, } } func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) { cs.RLock() isClosing := cs.isClosing cs.RUnlock() if isClosing { return nil, errors.Errorf("conn store is closing") } pkiID := peer.PKIID endpoint := peer.Endpoint cs.Lock() destinationLock, hasConnected := cs.destinationLocks[string(pkiID)] if !hasConnected { destinationLock = &sync.Mutex{} cs.destinationLocks[string(pkiID)] = destinationLock } cs.Unlock() destinationLock.Lock() cs.RLock() conn, exists := cs.pki2Conn[string(pkiID)] if exists { cs.RUnlock() destinationLock.Unlock() return conn, nil } cs.RUnlock() createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID) if err == nil { cs.logger.Debugf("Created new connection to %s, %s", endpoint, pkiID) } destinationLock.Unlock() cs.RLock() isClosing = cs.isClosing cs.RUnlock() if isClosing { return nil, errors.Errorf("conn store is closing") } cs.Lock() delete(cs.destinationLocks, string(pkiID)) defer cs.Unlock() // check again, maybe someone connected to us during the connection creation? conn, exists = cs.pki2Conn[string(pkiID)] if exists { if createdConnection != nil { createdConnection.close() } return conn, nil } // no one connected to us AND we failed connecting! if err != nil { return nil, err } // At this point we know the latest PKI-ID of the remote peer. // Perhaps we are trying to connect to a peer that changed its PKI-ID. // Ensure that we are not already connected to that PKI-ID, // and if so - close the old connection in order to keep the latest one. // We keep the latest one because the remote peer is going to close // our old connection anyway once it sensed this connection handshake. if conn, exists := cs.pki2Conn[string(createdConnection.pkiID)]; exists { conn.close() } // at this point in the code, we created a connection to a remote peer conn = createdConnection cs.pki2Conn[string(createdConnection.pkiID)] = conn go conn.serviceConnection() return conn, nil } func (cs *connectionStore) connNum() int { cs.RLock() defer cs.RUnlock() return len(cs.pki2Conn) } func (cs *connectionStore) shutdown() { cs.shutdownOnce.Do(func() { cs.Lock() cs.isClosing = true for _, conn := range cs.pki2Conn { conn.close() } cs.pki2Conn = make(map[string]*connection) cs.Unlock() }) } // onConnected closes any connection to the remote peer and creates a new connection object to it in order to have only // one single bi-directional connection between a pair of peers func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *protoext.ConnectionInfo, metrics *metrics.CommMetrics) *connection { cs.Lock() defer cs.Unlock() if c, exists := cs.pki2Conn[string(connInfo.ID)]; exists { c.close() } conn := newConnection(nil, nil, serverStream, metrics, cs.config) conn.pkiID = connInfo.ID conn.info = connInfo conn.logger = cs.logger cs.pki2Conn[string(connInfo.ID)] = conn return conn } func (cs *connectionStore) closeConnByPKIid(pkiID common.PKIidType) { cs.Lock() defer cs.Unlock() if conn, exists := cs.pki2Conn[string(pkiID)]; exists { conn.close() delete(cs.pki2Conn, string(pkiID)) } } func newConnection(cl proto.GossipClient, c *grpc.ClientConn, s stream, metrics *metrics.CommMetrics, config ConnConfig) *connection { connection := &connection{ metrics: metrics, outBuff: make(chan *msgSending, config.SendBuffSize), cl: cl, conn: c, gossipStream: s, stopChan: make(chan struct{}, 1), recvBuffSize: config.RecvBuffSize, } return connection } // ConnConfig is the configuration required to initialize a new conn type ConnConfig struct { RecvBuffSize int SendBuffSize int } type connection struct { recvBuffSize int metrics *metrics.CommMetrics cancel context.CancelFunc info *protoext.ConnectionInfo outBuff chan *msgSending logger util.Logger // logger pkiID common.PKIidType // pkiID of the remote endpoint handler handler // function to invoke upon a message reception conn *grpc.ClientConn // gRPC connection to remote endpoint cl proto.GossipClient // gRPC stub of remote endpoint gossipStream stream // there can only be one stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine stopOnce sync.Once // once to ensure close is called only once } func (conn *connection) close() { conn.stopOnce.Do(func() { close(conn.stopChan) if conn.conn != nil { conn.conn.Close() } if conn.cancel != nil { conn.cancel() } }) } func (conn *connection) send(msg *protoext.SignedGossipMessage, onErr func(error), shouldBlock blockingBehavior) { m := &msgSending{ envelope: msg.Envelope, onErr: onErr, } select { case conn.outBuff <- m: // room in channel, successfully sent message, nothing to do case <-conn.stopChan: conn.logger.Debugf("Aborting send() to %s because connection is closing", conn.info.Endpoint) default: // did not send if shouldBlock { select { case conn.outBuff <- m: // try again, and wait to send case <-conn.stopChan: // stop blocking if the connection is closing } } else { conn.metrics.BufferOverflow.Add(1) conn.logger.Debugf("Buffer to %s overflowed, dropping message %s", conn.info.Endpoint, msg) } } } func (conn *connection) serviceConnection() error { errChan := make(chan error, 1) msgChan := make(chan *protoext.SignedGossipMessage, conn.recvBuffSize) // Call stream.Recv() asynchronously in readFromStream(), // and wait for either the Recv() call to end, // or a signal to close the connection, which exits // the method and makes the Recv() call to fail in the // readFromStream() method go conn.readFromStream(errChan, msgChan) go conn.writeToStream() for { select { case <-conn.stopChan: conn.logger.Debug("Closing reading from stream") return nil case err := <-errChan: return err case msg := <-msgChan: conn.handler(msg) } } } func (conn *connection) writeToStream() { stream := conn.gossipStream for { select { case m := <-conn.outBuff: err := stream.Send(m.envelope) if err != nil { go m.onErr(err) return } conn.metrics.SentMessages.Add(1) case <-conn.stopChan: conn.logger.Debug("Closing writing to stream") return } } } func (conn *connection) readFromStream(errChan chan error, msgChan chan *protoext.SignedGossipMessage) { stream := conn.gossipStream for { select { case <-conn.stopChan: return default: envelope, err := stream.Recv() if err != nil { errChan <- err conn.logger.Debugf("Got error, aborting: %v", err) return } conn.metrics.ReceivedMessages.Add(1) msg, err := protoext.EnvelopeToGossipMessage(envelope) if err != nil { errChan <- err conn.logger.Warningf("Got error, aborting: %v", err) return } select { case <-conn.stopChan: case msgChan <- msg: } } } } type msgSending struct { envelope *proto.Envelope onErr func(error) } //go:generate mockery -dir . -name MockStream -case underscore -output mocks/ type MockStream interface { proto.Gossip_GossipStreamClient }