sliding-sync/pubsub/pubsub.go
2024-03-11 10:30:03 +00:00

150 lines
4.0 KiB
Go

package pubsub
import (
"fmt"
"os"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})
type Payload interface {
// The type of payload; used mostly for logging and prometheus metrics
Type() string
}
// EmptyPayload is used internally to act as a synchronisation point with consumers when bufferSize==0.
// When no buffer is used, pubsub should act synchronously, meaning we wait for the consumer to process
// the message before sending the next one. This is used in tests to stop race conditions in the tests.
// We need to know when the consumer has consumed - make(ch, 0) isn't enough as that wakes up the producer
// too early (as soon as the consumer consumes it will free the buffer, whereas we need to wait for processing
// too). To ensure we wait for processing, we send this emptyPayload immediately after messages. When that
// returns, we know the previous payload was fully consumed.
type emptyPayload struct{}
func (p *emptyPayload) Type() string { return emptyPayloadType }
const emptyPayloadType = "empty"
// Listener represents the common functions required by all subscription listeners
type Listener interface {
// Begin listening on this channel with this callback starting from this position. Blocks until Close() is called.
Listen(chanName string, fn func(p Payload)) error
// Close the listener. No more callbacks should fire.
Close() error
}
// Notifier represents the common functions required by all notifiers
type Notifier interface {
// Notify chanName that there is a new payload p. Return an error if we failed to send the notification.
Notify(chanName string, p Payload) error
// Close is called when we should stop listening.
Close() error
}
type PubSub struct {
chans map[string]chan Payload
mu *sync.Mutex
closed bool
bufferSize int
}
func NewPubSub(bufferSize int) *PubSub {
return &PubSub{
chans: make(map[string]chan Payload),
mu: &sync.Mutex{},
bufferSize: bufferSize,
}
}
func (ps *PubSub) getChan(chanName string) chan Payload {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := ps.chans[chanName]
if ch == nil {
ch = make(chan Payload, ps.bufferSize)
ps.chans[chanName] = ch
}
return ch
}
func (ps *PubSub) Notify(chanName string, p Payload) error {
ch := ps.getChan(chanName)
select {
case ch <- p:
break
case <-time.After(5 * time.Second):
return fmt.Errorf("notify with payload %v timed out", p.Type())
}
if ps.bufferSize == 0 {
// for some reason go test -race flags this as racing with calls
// to close(ch), despite the fact that it _should_ be thread-safe :S
ps.mu.Lock()
ch <- &emptyPayload{}
ps.mu.Unlock()
}
return nil
}
func (ps *PubSub) Close() error {
if ps.closed {
return nil
}
ps.closed = true
ps.mu.Lock()
defer ps.mu.Unlock()
for _, ch := range ps.chans {
close(ch)
}
return nil
}
func (ps *PubSub) Listen(chanName string, fn func(p Payload)) error {
ch := ps.getChan(chanName)
for payload := range ch {
if payload.Type() == emptyPayloadType {
continue
}
fn(payload)
}
return nil
}
// Wrapper around a Notifier which adds Prometheus metrics
type PromNotifier struct {
Notifier
msgCounter *prometheus.CounterVec
}
func (p *PromNotifier) Notify(chanName string, payload Payload) error {
p.msgCounter.WithLabelValues(payload.Type()).Inc()
return p.Notifier.Notify(chanName, payload)
}
func (p *PromNotifier) Close() error {
prometheus.Unregister(p.msgCounter)
return p.Notifier.Close()
}
// Wrap a notifier for prometheus metrics
func NewPromNotifier(n Notifier, subsystem string) Notifier {
p := &PromNotifier{
Notifier: n,
msgCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: subsystem,
Name: "num_payloads",
Help: "Number of payloads published",
}, []string{"payload_type"}),
}
prometheus.MustRegister(p.msgCounter)
return p
}