287 lines
8.4 KiB
Go
287 lines
8.4 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package identity
|
|
|
|
import (
|
|
"bytes"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/hyperledger/fabric/gossip/api"
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
errors "github.com/pkg/errors"
|
|
)
|
|
|
|
// identityUsageThreshold sets the maximum time that an identity
|
|
// can not be used to verify some signature before it will be deleted
|
|
var usageThreshold = time.Hour
|
|
|
|
// Mapper holds mappings between pkiID
|
|
// to certificates(identities) of peers
|
|
type Mapper interface {
|
|
// Put associates an identity to its given pkiID, and returns an error
|
|
// in case the given pkiID doesn't match the identity
|
|
Put(pkiID common.PKIidType, identity api.PeerIdentityType) error
|
|
|
|
// Get returns the identity of a given pkiID, or error if such an identity
|
|
// isn't found
|
|
Get(pkiID common.PKIidType) (api.PeerIdentityType, error)
|
|
|
|
// Sign signs a message, returns a signed message on success
|
|
// or an error on failure
|
|
Sign(msg []byte) ([]byte, error)
|
|
|
|
// Verify verifies a signed message
|
|
Verify(vkID, signature, message []byte) error
|
|
|
|
// GetPKIidOfCert returns the PKI-ID of a certificate
|
|
GetPKIidOfCert(api.PeerIdentityType) common.PKIidType
|
|
|
|
// SuspectPeers re-validates all peers that match the given predicate
|
|
SuspectPeers(isSuspected api.PeerSuspector)
|
|
|
|
// IdentityInfo returns information known peer identities
|
|
IdentityInfo() api.PeerIdentitySet
|
|
|
|
// Stop stops all background computations of the Mapper
|
|
Stop()
|
|
}
|
|
|
|
type purgeTrigger func(pkiID common.PKIidType, identity api.PeerIdentityType)
|
|
|
|
// identityMapperImpl is a struct that implements Mapper
|
|
type identityMapperImpl struct {
|
|
onPurge purgeTrigger
|
|
mcs api.MessageCryptoService
|
|
sa api.SecurityAdvisor
|
|
pkiID2Cert map[string]*storedIdentity
|
|
sync.RWMutex
|
|
stopChan chan struct{}
|
|
once sync.Once
|
|
selfPKIID string
|
|
}
|
|
|
|
// NewIdentityMapper method, all we need is a reference to a MessageCryptoService
|
|
func NewIdentityMapper(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, onPurge purgeTrigger, sa api.SecurityAdvisor) Mapper {
|
|
selfPKIID := mcs.GetPKIidOfCert(selfIdentity)
|
|
idMapper := &identityMapperImpl{
|
|
onPurge: onPurge,
|
|
mcs: mcs,
|
|
pkiID2Cert: make(map[string]*storedIdentity),
|
|
stopChan: make(chan struct{}),
|
|
selfPKIID: string(selfPKIID),
|
|
sa: sa,
|
|
}
|
|
if err := idMapper.Put(selfPKIID, selfIdentity); err != nil {
|
|
panic(errors.Wrap(err, "Failed putting our own identity into the identity mapper"))
|
|
}
|
|
go idMapper.periodicalPurgeUnusedIdentities()
|
|
return idMapper
|
|
}
|
|
|
|
func (is *identityMapperImpl) periodicalPurgeUnusedIdentities() {
|
|
usageTh := GetIdentityUsageThreshold()
|
|
for {
|
|
select {
|
|
case <-is.stopChan:
|
|
return
|
|
case <-time.After(usageTh / 10):
|
|
is.SuspectPeers(func(_ api.PeerIdentityType) bool {
|
|
return false
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// put associates an identity to its given pkiID, and returns an error
|
|
// in case the given pkiID doesn't match the identity
|
|
func (is *identityMapperImpl) Put(pkiID common.PKIidType, identity api.PeerIdentityType) error {
|
|
if pkiID == nil {
|
|
return errors.New("PKIID is nil")
|
|
}
|
|
if identity == nil {
|
|
return errors.New("identity is nil")
|
|
}
|
|
|
|
expirationDate, err := is.mcs.Expiration(identity)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed classifying identity")
|
|
}
|
|
|
|
if err := is.mcs.ValidateIdentity(identity); err != nil {
|
|
return err
|
|
}
|
|
|
|
id := is.mcs.GetPKIidOfCert(identity)
|
|
if !bytes.Equal(pkiID, id) {
|
|
return errors.New("identity doesn't match the computed pkiID")
|
|
}
|
|
|
|
is.Lock()
|
|
defer is.Unlock()
|
|
// Check if identity already exists.
|
|
// If so, no need to overwrite it.
|
|
if _, exists := is.pkiID2Cert[string(pkiID)]; exists {
|
|
return nil
|
|
}
|
|
|
|
var expirationTimer *time.Timer
|
|
if !expirationDate.IsZero() {
|
|
if time.Now().After(expirationDate) {
|
|
return errors.New("gossipping peer identity expired")
|
|
}
|
|
// Identity would be wiped out a millisecond after its expiration date
|
|
timeToLive := time.Until(expirationDate.Add(time.Millisecond))
|
|
expirationTimer = time.AfterFunc(timeToLive, func() {
|
|
is.delete(pkiID, identity)
|
|
})
|
|
}
|
|
|
|
is.pkiID2Cert[string(id)] = newStoredIdentity(pkiID, identity, expirationTimer, is.sa.OrgByPeerIdentity(identity))
|
|
return nil
|
|
}
|
|
|
|
// get returns the identity of a given pkiID, or error if such an identity
|
|
// isn't found
|
|
func (is *identityMapperImpl) Get(pkiID common.PKIidType) (api.PeerIdentityType, error) {
|
|
is.RLock()
|
|
defer is.RUnlock()
|
|
storedIdentity, exists := is.pkiID2Cert[string(pkiID)]
|
|
if !exists {
|
|
return nil, errors.New("PKIID wasn't found")
|
|
}
|
|
return storedIdentity.fetchIdentity(), nil
|
|
}
|
|
|
|
// Sign signs a message, returns a signed message on success
|
|
// or an error on failure
|
|
func (is *identityMapperImpl) Sign(msg []byte) ([]byte, error) {
|
|
return is.mcs.Sign(msg)
|
|
}
|
|
|
|
func (is *identityMapperImpl) Stop() {
|
|
is.once.Do(func() {
|
|
close(is.stopChan)
|
|
})
|
|
}
|
|
|
|
// Verify verifies a signed message
|
|
func (is *identityMapperImpl) Verify(vkID, signature, message []byte) error {
|
|
cert, err := is.Get(vkID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return is.mcs.Verify(cert, signature, message)
|
|
}
|
|
|
|
// GetPKIidOfCert returns the PKI-ID of a certificate
|
|
func (is *identityMapperImpl) GetPKIidOfCert(identity api.PeerIdentityType) common.PKIidType {
|
|
return is.mcs.GetPKIidOfCert(identity)
|
|
}
|
|
|
|
// SuspectPeers re-validates all peers that match the given predicate
|
|
func (is *identityMapperImpl) SuspectPeers(isSuspected api.PeerSuspector) {
|
|
for _, identity := range is.validateIdentities(isSuspected) {
|
|
identity.cancelExpirationTimer()
|
|
is.delete(identity.pkiID, identity.peerIdentity)
|
|
}
|
|
}
|
|
|
|
// validateIdentities returns a list of identities that have been revoked, expired or haven't been
|
|
// used for a long time
|
|
func (is *identityMapperImpl) validateIdentities(isSuspected api.PeerSuspector) []*storedIdentity {
|
|
now := time.Now()
|
|
usageTh := GetIdentityUsageThreshold()
|
|
is.RLock()
|
|
defer is.RUnlock()
|
|
var revokedIdentities []*storedIdentity
|
|
for pkiID, storedIdentity := range is.pkiID2Cert {
|
|
if pkiID != is.selfPKIID && storedIdentity.fetchLastAccessTime().Add(usageTh).Before(now) {
|
|
revokedIdentities = append(revokedIdentities, storedIdentity)
|
|
continue
|
|
}
|
|
if !isSuspected(storedIdentity.peerIdentity) {
|
|
continue
|
|
}
|
|
if err := is.mcs.ValidateIdentity(storedIdentity.fetchIdentity()); err != nil {
|
|
revokedIdentities = append(revokedIdentities, storedIdentity)
|
|
}
|
|
}
|
|
return revokedIdentities
|
|
}
|
|
|
|
// IdentityInfo returns information known peer identities
|
|
func (is *identityMapperImpl) IdentityInfo() api.PeerIdentitySet {
|
|
var res api.PeerIdentitySet
|
|
is.RLock()
|
|
defer is.RUnlock()
|
|
for _, storedIdentity := range is.pkiID2Cert {
|
|
res = append(res, api.PeerIdentityInfo{
|
|
Identity: storedIdentity.peerIdentity,
|
|
PKIId: storedIdentity.pkiID,
|
|
Organization: storedIdentity.orgId,
|
|
})
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (is *identityMapperImpl) delete(pkiID common.PKIidType, identity api.PeerIdentityType) {
|
|
is.Lock()
|
|
defer is.Unlock()
|
|
is.onPurge(pkiID, identity)
|
|
delete(is.pkiID2Cert, string(pkiID))
|
|
}
|
|
|
|
type storedIdentity struct {
|
|
pkiID common.PKIidType
|
|
lastAccessTime int64
|
|
peerIdentity api.PeerIdentityType
|
|
orgId api.OrgIdentityType
|
|
expirationTimer *time.Timer
|
|
}
|
|
|
|
func newStoredIdentity(pkiID common.PKIidType, identity api.PeerIdentityType, expirationTimer *time.Timer, org api.OrgIdentityType) *storedIdentity {
|
|
return &storedIdentity{
|
|
pkiID: pkiID,
|
|
lastAccessTime: time.Now().UnixNano(),
|
|
peerIdentity: identity,
|
|
expirationTimer: expirationTimer,
|
|
orgId: org,
|
|
}
|
|
}
|
|
|
|
func (si *storedIdentity) fetchIdentity() api.PeerIdentityType {
|
|
atomic.StoreInt64(&si.lastAccessTime, time.Now().UnixNano())
|
|
return si.peerIdentity
|
|
}
|
|
|
|
func (si *storedIdentity) fetchLastAccessTime() time.Time {
|
|
return time.Unix(0, atomic.LoadInt64(&si.lastAccessTime))
|
|
}
|
|
|
|
func (si *storedIdentity) cancelExpirationTimer() {
|
|
if si.expirationTimer == nil {
|
|
return
|
|
}
|
|
si.expirationTimer.Stop()
|
|
}
|
|
|
|
// SetIdentityUsageThreshold sets the usage threshold of identities.
|
|
// Identities that are not used at least once during the given time
|
|
// are purged
|
|
func SetIdentityUsageThreshold(duration time.Duration) {
|
|
atomic.StoreInt64((*int64)(&usageThreshold), int64(duration))
|
|
}
|
|
|
|
// GetIdentityUsageThreshold returns the usage threshold of identities.
|
|
// Identities that are not used at least once during the usage threshold
|
|
// duration are purged.
|
|
func GetIdentityUsageThreshold() time.Duration {
|
|
return time.Duration(atomic.LoadInt64((*int64)(&usageThreshold)))
|
|
}
|