663 lines
19 KiB
Go
663 lines
19 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package comm
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
proto "github.com/hyperledger/fabric-protos-go/gossip"
|
|
"github.com/hyperledger/fabric/gossip/api"
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
"github.com/hyperledger/fabric/gossip/identity"
|
|
"github.com/hyperledger/fabric/gossip/metrics"
|
|
"github.com/hyperledger/fabric/gossip/protoext"
|
|
"github.com/hyperledger/fabric/gossip/util"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/peer"
|
|
)
|
|
|
|
const (
|
|
handshakeTimeout = time.Second * 10
|
|
DefDialTimeout = time.Second * 3
|
|
DefConnTimeout = time.Second * 2
|
|
DefRecvBuffSize = 20
|
|
DefSendBuffSize = 20
|
|
)
|
|
|
|
var errProbe = errors.New("probe")
|
|
|
|
// SecurityAdvisor defines an external auxiliary object
|
|
// that provides security and identity related capabilities
|
|
type SecurityAdvisor interface {
|
|
// OrgByPeerIdentity returns the organization identity of the given PeerIdentityType
|
|
OrgByPeerIdentity(api.PeerIdentityType) api.OrgIdentityType
|
|
}
|
|
|
|
func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
|
|
if len(opts) == 0 {
|
|
c.logger.Warning("Given an empty set of grpc.DialOption, aborting")
|
|
return
|
|
}
|
|
c.opts = opts
|
|
}
|
|
|
|
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
|
|
func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper,
|
|
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts, sa api.SecurityAdvisor,
|
|
commMetrics *metrics.CommMetrics, config CommConfig, dialOpts ...grpc.DialOption) (Comm, error) {
|
|
commInst := &commImpl{
|
|
sa: sa,
|
|
pubSub: util.NewPubSub(),
|
|
PKIID: idStore.GetPKIidOfCert(peerIdentity),
|
|
idMapper: idStore,
|
|
logger: util.GetLogger(util.CommLogger, ""),
|
|
peerIdentity: peerIdentity,
|
|
opts: dialOpts,
|
|
secureDialOpts: secureDialOpts,
|
|
msgPublisher: NewChannelDemultiplexer(),
|
|
lock: &sync.Mutex{},
|
|
deadEndpoints: make(chan common.PKIidType, 100),
|
|
identityChanges: make(chan common.PKIidType, 1),
|
|
stopping: int32(0),
|
|
exitChan: make(chan struct{}),
|
|
subscriptions: make([]chan protoext.ReceivedMessage, 0),
|
|
tlsCerts: certs,
|
|
metrics: commMetrics,
|
|
dialTimeout: config.DialTimeout,
|
|
connTimeout: config.ConnTimeout,
|
|
recvBuffSize: config.RecvBuffSize,
|
|
sendBuffSize: config.SendBuffSize,
|
|
}
|
|
|
|
connConfig := ConnConfig{
|
|
RecvBuffSize: config.RecvBuffSize,
|
|
SendBuffSize: config.SendBuffSize,
|
|
}
|
|
|
|
commInst.connStore = newConnStore(commInst, commInst.logger, connConfig)
|
|
|
|
proto.RegisterGossipServer(s, commInst)
|
|
|
|
return commInst, nil
|
|
}
|
|
|
|
// CommConfig is the configuration required to initialize a new comm
|
|
type CommConfig struct {
|
|
DialTimeout time.Duration // Dial timeout
|
|
ConnTimeout time.Duration // Connection timeout
|
|
RecvBuffSize int // Buffer size of received messages
|
|
SendBuffSize int // Buffer size of sending messages
|
|
}
|
|
|
|
type commImpl struct {
|
|
sa api.SecurityAdvisor
|
|
tlsCerts *common.TLSCertificates
|
|
pubSub *util.PubSub
|
|
peerIdentity api.PeerIdentityType
|
|
idMapper identity.Mapper
|
|
logger util.Logger
|
|
opts []grpc.DialOption
|
|
secureDialOpts func() []grpc.DialOption
|
|
connStore *connectionStore
|
|
PKIID []byte
|
|
deadEndpoints chan common.PKIidType
|
|
identityChanges chan common.PKIidType
|
|
msgPublisher *ChannelDeMultiplexer
|
|
lock *sync.Mutex
|
|
exitChan chan struct{}
|
|
stopWG sync.WaitGroup
|
|
subscriptions []chan protoext.ReceivedMessage
|
|
stopping int32
|
|
metrics *metrics.CommMetrics
|
|
dialTimeout time.Duration
|
|
connTimeout time.Duration
|
|
recvBuffSize int
|
|
sendBuffSize int
|
|
}
|
|
|
|
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
|
|
var err error
|
|
var cc *grpc.ClientConn
|
|
var stream proto.Gossip_GossipStreamClient
|
|
var pkiID common.PKIidType
|
|
var connInfo *protoext.ConnectionInfo
|
|
var dialOpts []grpc.DialOption
|
|
|
|
c.logger.Debug("Entering", endpoint, expectedPKIID)
|
|
defer c.logger.Debug("Exiting")
|
|
|
|
if c.isStopping() {
|
|
return nil, errors.New("Stopping")
|
|
}
|
|
dialOpts = append(dialOpts, c.secureDialOpts()...)
|
|
dialOpts = append(dialOpts, grpc.WithBlock())
|
|
dialOpts = append(dialOpts, c.opts...)
|
|
ctx := context.Background()
|
|
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
|
|
defer cancel()
|
|
cc, err = grpc.DialContext(ctx, endpoint, dialOpts...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
cl := proto.NewGossipClient(cc)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), c.connTimeout)
|
|
defer cancel()
|
|
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
|
|
cc.Close()
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
if stream, err = cl.GossipStream(ctx); err == nil {
|
|
connInfo, err = c.authenticateRemotePeer(stream, true, false)
|
|
if err == nil {
|
|
pkiID = connInfo.ID
|
|
// PKIID is nil when we don't know the remote PKI id's
|
|
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
|
|
actualOrg := c.sa.OrgByPeerIdentity(connInfo.Identity)
|
|
// If the identity isn't present, it's nil - therefore OrgByPeerIdentity would
|
|
// return nil too and thus would be different than the actual organization
|
|
identity, _ := c.idMapper.Get(expectedPKIID)
|
|
oldOrg := c.sa.OrgByPeerIdentity(identity)
|
|
if !bytes.Equal(actualOrg, oldOrg) {
|
|
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
|
|
cc.Close()
|
|
cancel()
|
|
return nil, errors.New("authentication failure")
|
|
} else {
|
|
c.logger.Infof("Peer %s changed its PKI-ID from %s to %s", endpoint, expectedPKIID, pkiID)
|
|
c.identityChanges <- expectedPKIID
|
|
}
|
|
}
|
|
connConfig := ConnConfig{
|
|
RecvBuffSize: c.recvBuffSize,
|
|
SendBuffSize: c.sendBuffSize,
|
|
}
|
|
conn := newConnection(cl, cc, stream, c.metrics, connConfig)
|
|
conn.pkiID = pkiID
|
|
conn.info = connInfo
|
|
conn.logger = c.logger
|
|
conn.cancel = cancel
|
|
|
|
h := func(m *protoext.SignedGossipMessage) {
|
|
c.logger.Debug("Got message:", m)
|
|
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
|
|
conn: conn,
|
|
SignedGossipMessage: m,
|
|
connInfo: connInfo,
|
|
})
|
|
}
|
|
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
|
|
return conn, nil
|
|
}
|
|
c.logger.Warningf("Authentication failed: %+v", err)
|
|
}
|
|
cc.Close()
|
|
cancel()
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
func (c *commImpl) Send(msg *protoext.SignedGossipMessage, peers ...*RemotePeer) {
|
|
if c.isStopping() || len(peers) == 0 {
|
|
return
|
|
}
|
|
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
|
|
|
|
for _, peer := range peers {
|
|
go func(peer *RemotePeer, msg *protoext.SignedGossipMessage) {
|
|
c.sendToEndpoint(peer, msg, nonBlockingSend)
|
|
}(peer, msg)
|
|
}
|
|
}
|
|
|
|
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *protoext.SignedGossipMessage, shouldBlock blockingBehavior) {
|
|
if c.isStopping() {
|
|
return
|
|
}
|
|
c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
|
|
defer c.logger.Debug("Exiting")
|
|
var err error
|
|
|
|
conn, err := c.connStore.getConnection(peer)
|
|
if err == nil {
|
|
disConnectOnErr := func(err error) {
|
|
c.logger.Warningf("%v isn't responsive: %v", peer, err)
|
|
c.disconnect(peer.PKIID)
|
|
conn.close()
|
|
}
|
|
conn.send(msg, disConnectOnErr, shouldBlock)
|
|
return
|
|
}
|
|
c.logger.Warningf("Failed obtaining connection for %v reason: %v", peer, err)
|
|
c.disconnect(peer.PKIID)
|
|
}
|
|
|
|
func (c *commImpl) isStopping() bool {
|
|
return atomic.LoadInt32(&c.stopping) == int32(1)
|
|
}
|
|
|
|
func (c *commImpl) Probe(remotePeer *RemotePeer) error {
|
|
var dialOpts []grpc.DialOption
|
|
endpoint := remotePeer.Endpoint
|
|
pkiID := remotePeer.PKIID
|
|
if c.isStopping() {
|
|
return errors.New("stopping")
|
|
}
|
|
c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID)
|
|
dialOpts = append(dialOpts, c.secureDialOpts()...)
|
|
dialOpts = append(dialOpts, grpc.WithBlock())
|
|
dialOpts = append(dialOpts, c.opts...)
|
|
ctx := context.Background()
|
|
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
|
|
defer cancel()
|
|
cc, err := grpc.DialContext(ctx, remotePeer.Endpoint, dialOpts...)
|
|
if err != nil {
|
|
c.logger.Debugf("Returning %v", err)
|
|
return err
|
|
}
|
|
defer cc.Close()
|
|
cl := proto.NewGossipClient(cc)
|
|
ctx, cancel = context.WithTimeout(context.Background(), c.connTimeout)
|
|
defer cancel()
|
|
_, err = cl.Ping(ctx, &proto.Empty{})
|
|
c.logger.Debugf("Returning %v", err)
|
|
return err
|
|
}
|
|
|
|
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) {
|
|
var dialOpts []grpc.DialOption
|
|
dialOpts = append(dialOpts, c.secureDialOpts()...)
|
|
dialOpts = append(dialOpts, grpc.WithBlock())
|
|
dialOpts = append(dialOpts, c.opts...)
|
|
ctx := context.Background()
|
|
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
|
|
defer cancel()
|
|
cc, err := grpc.DialContext(ctx, remotePeer.Endpoint, dialOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer cc.Close()
|
|
|
|
cl := proto.NewGossipClient(cc)
|
|
ctx, cancel = context.WithTimeout(context.Background(), c.connTimeout)
|
|
defer cancel()
|
|
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), handshakeTimeout)
|
|
defer cancel()
|
|
stream, err := cl.GossipStream(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
connInfo, err := c.authenticateRemotePeer(stream, true, true)
|
|
if err != nil {
|
|
c.logger.Warningf("Authentication failed: %v", err)
|
|
return nil, err
|
|
}
|
|
if len(remotePeer.PKIID) > 0 && !bytes.Equal(connInfo.ID, remotePeer.PKIID) {
|
|
return nil, errors.New("PKI-ID of remote peer doesn't match expected PKI-ID")
|
|
}
|
|
return connInfo.Identity, nil
|
|
}
|
|
|
|
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan protoext.ReceivedMessage {
|
|
genericChan := c.msgPublisher.AddChannel(acceptor)
|
|
specificChan := make(chan protoext.ReceivedMessage, 10)
|
|
|
|
if c.isStopping() {
|
|
c.logger.Warning("Accept() called but comm module is stopping, returning empty channel")
|
|
return specificChan
|
|
}
|
|
|
|
c.lock.Lock()
|
|
c.subscriptions = append(c.subscriptions, specificChan)
|
|
c.lock.Unlock()
|
|
|
|
c.stopWG.Add(1)
|
|
go func() {
|
|
defer c.logger.Debug("Exiting Accept() loop")
|
|
|
|
defer c.stopWG.Done()
|
|
|
|
for {
|
|
select {
|
|
case msg, channelOpen := <-genericChan:
|
|
if !channelOpen {
|
|
return
|
|
}
|
|
select {
|
|
case specificChan <- msg.(*ReceivedMessageImpl):
|
|
case <-c.exitChan:
|
|
return
|
|
}
|
|
case <-c.exitChan:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return specificChan
|
|
}
|
|
|
|
func (c *commImpl) PresumedDead() <-chan common.PKIidType {
|
|
return c.deadEndpoints
|
|
}
|
|
|
|
func (c *commImpl) IdentitySwitch() <-chan common.PKIidType {
|
|
return c.identityChanges
|
|
}
|
|
|
|
func (c *commImpl) CloseConn(peer *RemotePeer) {
|
|
c.logger.Debug("Closing connection for", peer)
|
|
c.connStore.closeConnByPKIid(peer.PKIID)
|
|
}
|
|
|
|
func (c *commImpl) closeSubscriptions() {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
for _, ch := range c.subscriptions {
|
|
close(ch)
|
|
}
|
|
}
|
|
|
|
func (c *commImpl) Stop() {
|
|
if !atomic.CompareAndSwapInt32(&c.stopping, 0, int32(1)) {
|
|
return
|
|
}
|
|
c.logger.Info("Stopping")
|
|
defer c.logger.Info("Stopped")
|
|
c.connStore.shutdown()
|
|
c.logger.Debug("Shut down connection store, connection count:", c.connStore.connNum())
|
|
c.msgPublisher.Close()
|
|
close(c.exitChan)
|
|
c.stopWG.Wait()
|
|
c.closeSubscriptions()
|
|
}
|
|
|
|
func (c *commImpl) GetPKIid() common.PKIidType {
|
|
return c.PKIID
|
|
}
|
|
|
|
func extractRemoteAddress(stream stream) string {
|
|
var remoteAddress string
|
|
p, ok := peer.FromContext(stream.Context())
|
|
if ok {
|
|
if address := p.Addr; address != nil {
|
|
remoteAddress = address.String()
|
|
}
|
|
}
|
|
return remoteAddress
|
|
}
|
|
|
|
func (c *commImpl) authenticateRemotePeer(stream stream, initiator, isProbe bool) (*protoext.ConnectionInfo, error) {
|
|
ctx := stream.Context()
|
|
remoteAddress := extractRemoteAddress(stream)
|
|
remoteCertHash := extractCertificateHashFromContext(ctx)
|
|
var err error
|
|
var cMsg *protoext.SignedGossipMessage
|
|
useTLS := c.tlsCerts != nil
|
|
var selfCertHash []byte
|
|
|
|
if useTLS {
|
|
certReference := c.tlsCerts.TLSServerCert
|
|
if initiator {
|
|
certReference = c.tlsCerts.TLSClientCert
|
|
}
|
|
selfCertHash = certHashFromRawCert(certReference.Load().(*tls.Certificate).Certificate[0])
|
|
}
|
|
|
|
signer := func(msg []byte) ([]byte, error) {
|
|
return c.idMapper.Sign(msg)
|
|
}
|
|
|
|
// TLS enabled but not detected on other side
|
|
if useTLS && len(remoteCertHash) == 0 {
|
|
c.logger.Warningf("%s didn't send TLS certificate", remoteAddress)
|
|
return nil, errors.New("no TLS certificate")
|
|
}
|
|
|
|
cMsg, err = c.createConnectionMsg(c.PKIID, selfCertHash, c.peerIdentity, signer, isProbe)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.logger.Debug("Sending", cMsg, "to", remoteAddress)
|
|
stream.Send(cMsg.Envelope)
|
|
m, err := readWithTimeout(stream, c.connTimeout, remoteAddress)
|
|
if err != nil {
|
|
c.logger.Warningf("Failed reading message from %s, reason: %v", remoteAddress, err)
|
|
return nil, err
|
|
}
|
|
receivedMsg := m.GetConn()
|
|
if receivedMsg == nil {
|
|
c.logger.Warning("Expected connection message from", remoteAddress, "but got", receivedMsg)
|
|
return nil, errors.New("wrong type")
|
|
}
|
|
|
|
if receivedMsg.PkiId == nil {
|
|
c.logger.Warningf("%s didn't send a pkiID", remoteAddress)
|
|
return nil, errors.New("no PKI-ID")
|
|
}
|
|
|
|
c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
|
|
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Identity)
|
|
if err != nil {
|
|
c.logger.Warningf("Identity store rejected %s : %v", remoteAddress, err)
|
|
return nil, err
|
|
}
|
|
|
|
connInfo := &protoext.ConnectionInfo{
|
|
ID: receivedMsg.PkiId,
|
|
Identity: receivedMsg.Identity,
|
|
Endpoint: remoteAddress,
|
|
Auth: &protoext.AuthInfo{
|
|
Signature: m.Signature,
|
|
SignedData: m.Payload,
|
|
},
|
|
}
|
|
|
|
// if TLS is enabled and detected, verify remote peer
|
|
if useTLS {
|
|
// If the remote peer sent its TLS certificate, make sure it actually matches the TLS cert
|
|
// that the peer used.
|
|
if !bytes.Equal(remoteCertHash, receivedMsg.TlsCertHash) {
|
|
return nil, errors.Errorf("Expected %v in remote hash of TLS cert, but got %v", remoteCertHash, receivedMsg.TlsCertHash)
|
|
}
|
|
}
|
|
// Final step - verify the signature on the connection message itself
|
|
verifier := func(peerIdentity []byte, signature, message []byte) error {
|
|
pkiID := c.idMapper.GetPKIidOfCert(peerIdentity)
|
|
return c.idMapper.Verify(pkiID, signature, message)
|
|
}
|
|
err = m.Verify(receivedMsg.Identity, verifier)
|
|
if err != nil {
|
|
c.logger.Errorf("Failed verifying signature from %s : %v", remoteAddress, err)
|
|
return nil, err
|
|
}
|
|
|
|
c.logger.Debug("Authenticated", remoteAddress)
|
|
|
|
if receivedMsg.Probe {
|
|
return connInfo, errProbe
|
|
}
|
|
|
|
return connInfo, nil
|
|
}
|
|
|
|
// SendWithAck sends a message to remote peers, waiting for acknowledgement from minAck of them, or until a certain timeout expires
|
|
func (c *commImpl) SendWithAck(msg *protoext.SignedGossipMessage, timeout time.Duration, minAck int, peers ...*RemotePeer) AggregatedSendResult {
|
|
if len(peers) == 0 {
|
|
return nil
|
|
}
|
|
var err error
|
|
|
|
// Roll a random NONCE to be used as a send ID to differentiate
|
|
// between different invocations
|
|
msg.Nonce = util.RandomUInt64()
|
|
// Replace the envelope in the message to update the NONCE
|
|
msg, err = protoext.NoopSign(msg.GossipMessage)
|
|
|
|
if c.isStopping() || err != nil {
|
|
if err == nil {
|
|
err = errors.New("comm is stopping")
|
|
}
|
|
results := []SendResult{}
|
|
for _, p := range peers {
|
|
results = append(results, SendResult{
|
|
error: err,
|
|
RemotePeer: *p,
|
|
})
|
|
}
|
|
return results
|
|
}
|
|
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
|
|
sndFunc := func(peer *RemotePeer, msg *protoext.SignedGossipMessage) {
|
|
c.sendToEndpoint(peer, msg, blockingSend)
|
|
}
|
|
// Subscribe to acks
|
|
subscriptions := make(map[string]func() error)
|
|
for _, p := range peers {
|
|
topic := topicForAck(msg.Nonce, p.PKIID)
|
|
sub := c.pubSub.Subscribe(topic, timeout)
|
|
subscriptions[string(p.PKIID)] = func() error {
|
|
msg, err := sub.Listen()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if msg, isAck := msg.(*proto.Acknowledgement); !isAck {
|
|
return errors.Errorf("received a message of type %s, expected *proto.Acknowledgement", reflect.TypeOf(msg))
|
|
} else {
|
|
if msg.Error != "" {
|
|
return errors.New(msg.Error)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
waitForAck := func(p *RemotePeer) error {
|
|
return subscriptions[string(p.PKIID)]()
|
|
}
|
|
ackOperation := newAckSendOperation(sndFunc, waitForAck)
|
|
return ackOperation.send(msg, minAck, peers...)
|
|
}
|
|
|
|
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
|
|
if c.isStopping() {
|
|
return errors.New("shutting down")
|
|
}
|
|
connInfo, err := c.authenticateRemotePeer(stream, false, false)
|
|
|
|
if err == errProbe {
|
|
c.logger.Infof("Peer %s (%s) probed us", connInfo.ID, connInfo.Endpoint)
|
|
return nil
|
|
}
|
|
|
|
if err != nil {
|
|
c.logger.Errorf("Authentication failed: %v", err)
|
|
return err
|
|
}
|
|
c.logger.Debug("Servicing", extractRemoteAddress(stream))
|
|
|
|
conn := c.connStore.onConnected(stream, connInfo, c.metrics)
|
|
|
|
h := func(m *protoext.SignedGossipMessage) {
|
|
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
|
|
conn: conn,
|
|
SignedGossipMessage: m,
|
|
connInfo: connInfo,
|
|
})
|
|
}
|
|
|
|
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
|
|
|
|
defer func() {
|
|
c.logger.Debug("Client", extractRemoteAddress(stream), " disconnected")
|
|
c.connStore.closeConnByPKIid(connInfo.ID)
|
|
}()
|
|
|
|
return conn.serviceConnection()
|
|
}
|
|
|
|
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
func (c *commImpl) disconnect(pkiID common.PKIidType) {
|
|
select {
|
|
case c.deadEndpoints <- pkiID:
|
|
case <-c.exitChan:
|
|
return
|
|
}
|
|
|
|
c.connStore.closeConnByPKIid(pkiID)
|
|
}
|
|
|
|
func readWithTimeout(stream stream, timeout time.Duration, address string) (*protoext.SignedGossipMessage, error) {
|
|
incChan := make(chan *protoext.SignedGossipMessage, 1)
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
if m, err := stream.Recv(); err == nil {
|
|
msg, err := protoext.EnvelopeToGossipMessage(m)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
incChan <- msg
|
|
}
|
|
}()
|
|
select {
|
|
case <-time.After(timeout):
|
|
return nil, errors.Errorf("timed out waiting for connection message from %s", address)
|
|
case m := <-incChan:
|
|
return m, nil
|
|
case err := <-errChan:
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer protoext.Signer, isProbe bool) (*protoext.SignedGossipMessage, error) {
|
|
m := &proto.GossipMessage{
|
|
Tag: proto.GossipMessage_EMPTY,
|
|
Nonce: 0,
|
|
Content: &proto.GossipMessage_Conn{
|
|
Conn: &proto.ConnEstablish{
|
|
TlsCertHash: certHash,
|
|
Identity: cert,
|
|
PkiId: pkiID,
|
|
Probe: isProbe,
|
|
},
|
|
},
|
|
}
|
|
sMsg := &protoext.SignedGossipMessage{
|
|
GossipMessage: m,
|
|
}
|
|
_, err := sMsg.Sign(signer)
|
|
return sMsg, errors.WithStack(err)
|
|
}
|
|
|
|
type stream interface {
|
|
Send(envelope *proto.Envelope) error
|
|
Recv() (*proto.Envelope, error)
|
|
Context() context.Context
|
|
}
|
|
|
|
func topicForAck(nonce uint64, pkiID common.PKIidType) string {
|
|
return fmt.Sprintf("%d %s", nonce, hex.EncodeToString(pkiID))
|
|
}
|