123 lines
2.7 KiB
Go
123 lines
2.7 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package util
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
subscriptionBuffSize = 50
|
|
)
|
|
|
|
// PubSub defines a struct that one can use to:
|
|
// - publish items to a topic to multiple subscribers
|
|
// - and subscribe to items from a topic
|
|
// The subscriptions have a TTL and are cleaned when it passes.
|
|
type PubSub struct {
|
|
sync.RWMutex
|
|
|
|
// a map from topic to Set of subscriptions
|
|
subscriptions map[string]*Set
|
|
}
|
|
|
|
// Subscription defines a subscription to a topic
|
|
// that can be used to receive publishes on
|
|
type Subscription interface {
|
|
// Listen blocks until a publish was made
|
|
// to the subscription, or an error if the
|
|
// subscription's TTL passed
|
|
Listen() (interface{}, error)
|
|
}
|
|
|
|
type subscription struct {
|
|
top string
|
|
ttl time.Duration
|
|
c chan interface{}
|
|
}
|
|
|
|
// Listen blocks until a publish was made
|
|
// to the subscription, or an error if the
|
|
// subscription's TTL passed
|
|
func (s *subscription) Listen() (interface{}, error) {
|
|
select {
|
|
case <-time.After(s.ttl):
|
|
return nil, errors.New("timed out")
|
|
case item := <-s.c:
|
|
return item, nil
|
|
}
|
|
}
|
|
|
|
// NewPubSub creates a new PubSub with an empty
|
|
// set of subscriptions
|
|
func NewPubSub() *PubSub {
|
|
return &PubSub{
|
|
subscriptions: make(map[string]*Set),
|
|
}
|
|
}
|
|
|
|
// Publish publishes an item to all subscribers on the topic
|
|
func (ps *PubSub) Publish(topic string, item interface{}) error {
|
|
ps.RLock()
|
|
defer ps.RUnlock()
|
|
s, subscribed := ps.subscriptions[topic]
|
|
if !subscribed {
|
|
return errors.New("no subscribers")
|
|
}
|
|
for _, sub := range s.ToArray() {
|
|
c := sub.(*subscription).c
|
|
select {
|
|
case c <- item:
|
|
default: // Not enough room in buffer, continue in order to not block publisher
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Subscribe returns a subscription to a topic that expires when given TTL passes
|
|
func (ps *PubSub) Subscribe(topic string, ttl time.Duration) Subscription {
|
|
sub := &subscription{
|
|
top: topic,
|
|
ttl: ttl,
|
|
c: make(chan interface{}, subscriptionBuffSize),
|
|
}
|
|
|
|
ps.Lock()
|
|
// Add subscription to subscriptions map
|
|
s, exists := ps.subscriptions[topic]
|
|
// If no subscription set for the topic exists, create one
|
|
if !exists {
|
|
s = NewSet()
|
|
ps.subscriptions[topic] = s
|
|
}
|
|
ps.Unlock()
|
|
|
|
// Add the subscription
|
|
s.Add(sub)
|
|
|
|
// When the timeout expires, remove the subscription
|
|
time.AfterFunc(ttl, func() {
|
|
ps.unSubscribe(sub)
|
|
})
|
|
return sub
|
|
}
|
|
|
|
func (ps *PubSub) unSubscribe(sub *subscription) {
|
|
ps.Lock()
|
|
defer ps.Unlock()
|
|
ps.subscriptions[sub.top].Remove(sub)
|
|
if ps.subscriptions[sub.top].Size() != 0 {
|
|
return
|
|
}
|
|
// Else, this is the last subscription for the topic.
|
|
// Remove the set from the subscriptions map
|
|
delete(ps.subscriptions, sub.top)
|
|
}
|