From 6b875482e8813c9d2cf4a0adbca765e6477cf4a7 Mon Sep 17 00:00:00 2001 From: Daniel Carbone Date: Tue, 5 Mar 2024 07:34:13 -0600 Subject: [PATCH 1/3] adding async push functionality --- README.md | 5 +++-- seb.go | 40 +++++++++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 3db7769..0d01026 100644 --- a/README.md +++ b/README.md @@ -83,8 +83,9 @@ No recipient will prevent or even delay an event from being pushed to other reci force event recipients to be as expedient as possible in its implementation, as the rest of the Bus won't wait for it to handle one event before sending another. -### Global subscription -For simplicity's sake, you either are listening or you aren't. If you don't care about a topic, ignore the message. +### Filtering +When registering a recipient, you may optionally provide a list of string's of specific topics or `*regex.Regexp` +instances. Your recipient will then only receive events that pass through those filters. ### No external deps Only use stdlib modules diff --git a/seb.go b/seb.go index f746f6a..c261b2c 100644 --- a/seb.go +++ b/seb.go @@ -302,16 +302,27 @@ func New(opts ...BusOpt) *Bus { return &b } -// Push will immediately send a new event to all currently registered recipients +// Push will immediately send a new event to all currently registered recipients, blocking until completed. func (b *Bus) Push(ctx context.Context, topic string, data any) error { return b.sendEvent(b.buildEvent(ctx, topic, data)) } -// PushTo attempts to push an even to a specific recipient +// PushAsync pushes an event to all recipients without blocking the caller. You amy optionally provide errc if you +// wish know about any / all errors that occurred during the push. Otherwise, set errc to nil. +func (b *Bus) PushAsync(ctx context.Context, topic string, data any, errc chan<- error) { + b.sendEventAsync(b.buildEvent(ctx, topic, data), errc) +} + +// PushTo attempts to push an even to a specific recipient, blocking until completed. func (b *Bus) PushTo(ctx context.Context, to, topic string, data any) error { return b.sendEventTo(to, b.buildEvent(ctx, topic, data)) } +// PushToAsync attempts to push an event to a specific recipient without blocking the caller. +func (b *Bus) PushToAsync(ctx context.Context, to, topic string, data any, errc chan<- error) { + b.sendEventToAsync(to, b.buildEvent(ctx, topic, data), errc) +} + // Request will push a new event with the Reply chan defined, blocking until a single response has been received // or the provided context expires func (b *Bus) Request(ctx context.Context, topic string, data any) (Reply, error) { @@ -465,17 +476,20 @@ func (b *Bus) sendEvent(ev Event) error { return nil } +func (b *Bus) sendEventAsync(ev Event, errc chan<- error) { + if errc == nil { + go func() { _ = b.sendEvent(ev) }() + } else { + go func() { errc <- b.sendEvent(ev) }() + } +} + func (b *Bus) sendEventTo(to string, ev Event) error { b.mu.Lock() + if w, ok := b.recipients[to]; ok { b.mu.Unlock() - - errCh := make(chan error, 1) - defer close(errCh) - - go func() { errCh <- w.push(ev) }() - - return <-errCh + return w.push(ev) } b.mu.Unlock() @@ -483,6 +497,14 @@ func (b *Bus) sendEventTo(to string, ev Event) error { return fmt.Errorf("%w: %s", ErrRecipientNotFound, to) } +func (b *Bus) sendEventToAsync(to string, ev Event, errc chan<- error) { + if errc == nil { + go func() { _ = b.sendEventTo(to, ev) }() + } else { + go func() { errc <- b.sendEventTo(to, ev) }() + } +} + func (b *Bus) doRequest(ctx context.Context, to, topic string, data any) (Reply, error) { ch := make(chan Reply) defer close(ch) From 1ccec491c5bd54b72def224ffca3470e65b7e6d4 Mon Sep 17 00:00:00 2001 From: Daniel Carbone Date: Tue, 5 Mar 2024 07:36:37 -0600 Subject: [PATCH 2/3] reducing call stack --- seb.go | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/seb.go b/seb.go index c261b2c..c2e0307 100644 --- a/seb.go +++ b/seb.go @@ -310,7 +310,12 @@ func (b *Bus) Push(ctx context.Context, topic string, data any) error { // PushAsync pushes an event to all recipients without blocking the caller. You amy optionally provide errc if you // wish know about any / all errors that occurred during the push. Otherwise, set errc to nil. func (b *Bus) PushAsync(ctx context.Context, topic string, data any, errc chan<- error) { - b.sendEventAsync(b.buildEvent(ctx, topic, data), errc) + ev := b.buildEvent(ctx, topic, data) + if errc == nil { + go func() { _ = b.sendEvent(ev) }() + } else { + go func() { errc <- b.sendEvent(ev) }() + } } // PushTo attempts to push an even to a specific recipient, blocking until completed. @@ -320,7 +325,12 @@ func (b *Bus) PushTo(ctx context.Context, to, topic string, data any) error { // PushToAsync attempts to push an event to a specific recipient without blocking the caller. func (b *Bus) PushToAsync(ctx context.Context, to, topic string, data any, errc chan<- error) { - b.sendEventToAsync(to, b.buildEvent(ctx, topic, data), errc) + ev := b.buildEvent(ctx, topic, data) + if errc == nil { + go func() { _ = b.sendEventTo(to, ev) }() + } else { + go func() { errc <- b.sendEventTo(to, ev) }() + } } // Request will push a new event with the Reply chan defined, blocking until a single response has been received @@ -476,14 +486,6 @@ func (b *Bus) sendEvent(ev Event) error { return nil } -func (b *Bus) sendEventAsync(ev Event, errc chan<- error) { - if errc == nil { - go func() { _ = b.sendEvent(ev) }() - } else { - go func() { errc <- b.sendEvent(ev) }() - } -} - func (b *Bus) sendEventTo(to string, ev Event) error { b.mu.Lock() @@ -497,14 +499,6 @@ func (b *Bus) sendEventTo(to string, ev Event) error { return fmt.Errorf("%w: %s", ErrRecipientNotFound, to) } -func (b *Bus) sendEventToAsync(to string, ev Event, errc chan<- error) { - if errc == nil { - go func() { _ = b.sendEventTo(to, ev) }() - } else { - go func() { errc <- b.sendEventTo(to, ev) }() - } -} - func (b *Bus) doRequest(ctx context.Context, to, topic string, data any) (Reply, error) { ch := make(chan Reply) defer close(ch) From 2d76b548d32ee9173aa0482593e477bdc601d014 Mon Sep 17 00:00:00 2001 From: Daniel Carbone Date: Tue, 5 Mar 2024 07:39:22 -0600 Subject: [PATCH 3/3] putting the workflow in the right dir >_< --- .github/{ => workflows}/tests.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/{ => workflows}/tests.yml (100%) diff --git a/.github/tests.yml b/.github/workflows/tests.yml similarity index 100% rename from .github/tests.yml rename to .github/workflows/tests.yml