107 lines
2.8 KiB
Go
107 lines
2.8 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package comm
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/hyperledger/fabric/gossip/common"
|
|
)
|
|
|
|
// ChannelDeMultiplexer is a struct that can receive channel registrations (AddChannel)
|
|
// and publications (DeMultiplex) and it broadcasts the publications to registrations
|
|
// according to their predicate. Can only be closed once and never open after a close.
|
|
type ChannelDeMultiplexer struct {
|
|
// lock protects everything below it.
|
|
lock sync.Mutex
|
|
closed bool // one way boolean from false -> true
|
|
stopCh chan struct{}
|
|
// deMuxInProgress keeps track of any calls to DeMultiplex
|
|
// that are still being handled. This is used to determine
|
|
// when it is safe to close all of the tracked channels
|
|
deMuxInProgress sync.WaitGroup
|
|
channels []*channel
|
|
}
|
|
|
|
// NewChannelDemultiplexer creates a new ChannelDeMultiplexer
|
|
func NewChannelDemultiplexer() *ChannelDeMultiplexer {
|
|
return &ChannelDeMultiplexer{stopCh: make(chan struct{})}
|
|
}
|
|
|
|
type channel struct {
|
|
pred common.MessageAcceptor
|
|
ch chan<- interface{}
|
|
}
|
|
|
|
// Close closes this channel, which makes all channels registered before
|
|
// to close as well.
|
|
func (m *ChannelDeMultiplexer) Close() {
|
|
m.lock.Lock()
|
|
if m.closed {
|
|
m.lock.Unlock()
|
|
return
|
|
}
|
|
m.closed = true
|
|
close(m.stopCh)
|
|
m.deMuxInProgress.Wait()
|
|
for _, ch := range m.channels {
|
|
close(ch.ch)
|
|
}
|
|
m.channels = nil
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
// AddChannel registers a channel with a certain predicate. AddChannel
|
|
// returns a read-only channel that will produce values that are
|
|
// matched by the predicate function.
|
|
//
|
|
// If the DeMultiplexer is closed, the channel returned will be closed
|
|
// to prevent users of the channel from waiting on the channel.
|
|
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) <-chan interface{} {
|
|
m.lock.Lock()
|
|
if m.closed { // closed once, can't put anything more in.
|
|
m.lock.Unlock()
|
|
ch := make(chan interface{})
|
|
close(ch)
|
|
return ch
|
|
}
|
|
bidirectionalCh := make(chan interface{}, 10)
|
|
// Assignment to channel converts bidirectionalCh to send-only.
|
|
// Return converts bidirectionalCh to a receive-only.
|
|
ch := &channel{ch: bidirectionalCh, pred: predicate}
|
|
m.channels = append(m.channels, ch)
|
|
m.lock.Unlock()
|
|
return bidirectionalCh
|
|
}
|
|
|
|
// DeMultiplex broadcasts the message to all channels that were returned
|
|
// by AddChannel calls and that hold the respected predicates.
|
|
//
|
|
// Blocks if any one channel that would receive msg has a full buffer.
|
|
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
|
|
m.lock.Lock()
|
|
if m.closed {
|
|
m.lock.Unlock()
|
|
return
|
|
}
|
|
channels := m.channels
|
|
m.deMuxInProgress.Add(1)
|
|
m.lock.Unlock()
|
|
|
|
for _, ch := range channels {
|
|
if ch.pred(msg) {
|
|
select {
|
|
case <-m.stopCh:
|
|
m.deMuxInProgress.Done()
|
|
return // stopping
|
|
case ch.ch <- msg:
|
|
}
|
|
}
|
|
}
|
|
m.deMuxInProgress.Done()
|
|
}
|