go_study/fabric-main/internal/pkg/peer/orderers/connection.go

215 lines
5.8 KiB
Go

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package orderers
import (
"bytes"
"crypto/sha256"
"math/rand"
"sync"
"github.com/hyperledger/fabric/common/flogging"
"github.com/pkg/errors"
)
type ConnectionSource struct {
mutex sync.RWMutex
allEndpoints []*Endpoint
orgToEndpointsHash map[string][]byte
logger *flogging.FabricLogger
overrides map[string]*Endpoint
}
type Endpoint struct {
Address string
RootCerts [][]byte
Refreshed chan struct{}
}
type OrdererOrg struct {
Addresses []string
RootCerts [][]byte
}
func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource {
return &ConnectionSource{
orgToEndpointsHash: map[string][]byte{},
logger: logger,
overrides: overrides,
}
}
func (cs *ConnectionSource) RandomEndpoint() (*Endpoint, error) {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
if len(cs.allEndpoints) == 0 {
return nil, errors.Errorf("no endpoints currently defined")
}
return cs.allEndpoints[rand.Intn(len(cs.allEndpoints))], nil
}
func (cs *ConnectionSource) Endpoints() []*Endpoint {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
return cs.allEndpoints
}
func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]OrdererOrg) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.logger.Debug("Processing updates for orderer endpoints")
newOrgToEndpointsHash := map[string][]byte{}
anyChange := false
hasOrgEndpoints := false
for orgName, org := range orgs {
hasher := sha256.New()
for _, cert := range org.RootCerts {
hasher.Write(cert)
}
for _, address := range org.Addresses {
hasOrgEndpoints = true
hasher.Write([]byte(address))
}
hash := hasher.Sum(nil)
newOrgToEndpointsHash[orgName] = hash
lastHash, ok := cs.orgToEndpointsHash[orgName]
if ok && bytes.Equal(hash, lastHash) {
continue
}
cs.logger.Debugf("Found orderer org '%s' has updates", orgName)
anyChange = true
}
for orgName := range cs.orgToEndpointsHash {
if _, ok := orgs[orgName]; !ok {
// An org that used to exist has been removed
cs.logger.Debugf("Found orderer org '%s' has been removed", orgName)
anyChange = true
}
}
cs.orgToEndpointsHash = newOrgToEndpointsHash
if hasOrgEndpoints && len(globalAddrs) > 0 {
cs.logger.Warning("Config defines both orderer org specific endpoints and global endpoints, global endpoints will be ignored")
}
if !hasOrgEndpoints && len(globalAddrs) != len(cs.allEndpoints) {
cs.logger.Debugf("There are no org endpoints, but the global addresses have changed")
anyChange = true
}
if !hasOrgEndpoints && !anyChange && len(globalAddrs) == len(cs.allEndpoints) {
// There are no org endpoints, there were no org endpoints, and the number
// of the update's global endpoints is the same as the number of existing endpoints.
// So, we check if any of the endpoints addresses differ.
newAddresses := map[string]struct{}{}
for _, address := range globalAddrs {
newAddresses[address] = struct{}{}
}
for _, endpoint := range cs.allEndpoints {
delete(newAddresses, endpoint.Address)
}
// Set anyChange true if some new address was not
// in the set of old endpoints.
anyChange = len(newAddresses) != 0
if anyChange {
cs.logger.Debugf("There are no org endpoints, but some of the global addresses have changed")
}
}
if !anyChange {
cs.logger.Debugf("No orderer endpoint addresses or TLS certs were changed")
// No TLS certs changed, no org specified endpoints changed,
// and if we are using global endpoints, they are the same
// as our last set. No need to update anything.
return
}
for _, endpoint := range cs.allEndpoints {
// Alert any existing consumers that have a reference to the old endpoints
// that their reference is now stale and they should get a new one.
// This is done even for endpoints which have the same TLS certs and address
// but this is desirable to help load balance. For instance if only
// one orderer were defined, and the config is updated to include 4 more, we
// want the peers to disconnect from that original orderer and reconnect
// evenly across the now five.
close(endpoint.Refreshed)
}
cs.allEndpoints = nil
var globalRootCerts [][]byte
for _, org := range orgs {
var rootCerts [][]byte
for _, rootCert := range org.RootCerts {
if hasOrgEndpoints {
rootCerts = append(rootCerts, rootCert)
} else {
globalRootCerts = append(globalRootCerts, rootCert)
}
}
// Note, if !hasOrgEndpoints, this for loop is a no-op
for _, address := range org.Addresses {
overrideEndpoint, ok := cs.overrides[address]
if ok {
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Address: overrideEndpoint.Address,
RootCerts: overrideEndpoint.RootCerts,
Refreshed: make(chan struct{}),
})
continue
}
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Address: address,
RootCerts: rootCerts,
Refreshed: make(chan struct{}),
})
}
}
if len(cs.allEndpoints) != 0 {
cs.logger.Debugf("Returning an orderer connection pool source with org specific endpoints only")
// There are some org specific endpoints, so we do not
// add any of the global endpoints to our pool.
return
}
for _, address := range globalAddrs {
overrideEndpoint, ok := cs.overrides[address]
if ok {
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Address: overrideEndpoint.Address,
RootCerts: overrideEndpoint.RootCerts,
Refreshed: make(chan struct{}),
})
continue
}
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Address: address,
RootCerts: globalRootCerts,
Refreshed: make(chan struct{}),
})
}
cs.logger.Debugf("Returning an orderer connection pool source with global endpoints only")
}