diff --git a/.github/tests.yml b/.github/workflows/tests.yml similarity index 100% rename from .github/tests.yml rename to .github/workflows/tests.yml diff --git a/README.md b/README.md index 3db7769..0d01026 100644 --- a/README.md +++ b/README.md @@ -83,8 +83,9 @@ No recipient will prevent or even delay an event from being pushed to other reci force event recipients to be as expedient as possible in its implementation, as the rest of the Bus won't wait for it to handle one event before sending another. -### Global subscription -For simplicity's sake, you either are listening or you aren't. If you don't care about a topic, ignore the message. +### Filtering +When registering a recipient, you may optionally provide a list of string's of specific topics or `*regex.Regexp` +instances. Your recipient will then only receive events that pass through those filters. ### No external deps Only use stdlib modules diff --git a/seb.go b/seb.go index f746f6a..c2e0307 100644 --- a/seb.go +++ b/seb.go @@ -302,16 +302,37 @@ func New(opts ...BusOpt) *Bus { return &b } -// Push will immediately send a new event to all currently registered recipients +// Push will immediately send a new event to all currently registered recipients, blocking until completed. func (b *Bus) Push(ctx context.Context, topic string, data any) error { return b.sendEvent(b.buildEvent(ctx, topic, data)) } -// PushTo attempts to push an even to a specific recipient +// PushAsync pushes an event to all recipients without blocking the caller. You amy optionally provide errc if you +// wish know about any / all errors that occurred during the push. Otherwise, set errc to nil. +func (b *Bus) PushAsync(ctx context.Context, topic string, data any, errc chan<- error) { + ev := b.buildEvent(ctx, topic, data) + if errc == nil { + go func() { _ = b.sendEvent(ev) }() + } else { + go func() { errc <- b.sendEvent(ev) }() + } +} + +// PushTo attempts to push an even to a specific recipient, blocking until completed. func (b *Bus) PushTo(ctx context.Context, to, topic string, data any) error { return b.sendEventTo(to, b.buildEvent(ctx, topic, data)) } +// PushToAsync attempts to push an event to a specific recipient without blocking the caller. +func (b *Bus) PushToAsync(ctx context.Context, to, topic string, data any, errc chan<- error) { + ev := b.buildEvent(ctx, topic, data) + if errc == nil { + go func() { _ = b.sendEventTo(to, ev) }() + } else { + go func() { errc <- b.sendEventTo(to, ev) }() + } +} + // Request will push a new event with the Reply chan defined, blocking until a single response has been received // or the provided context expires func (b *Bus) Request(ctx context.Context, topic string, data any) (Reply, error) { @@ -467,15 +488,10 @@ func (b *Bus) sendEvent(ev Event) error { func (b *Bus) sendEventTo(to string, ev Event) error { b.mu.Lock() + if w, ok := b.recipients[to]; ok { b.mu.Unlock() - - errCh := make(chan error, 1) - defer close(errCh) - - go func() { errCh <- w.push(ev) }() - - return <-errCh + return w.push(ev) } b.mu.Unlock()