Skip to content

Commit

Permalink
Adding async push functionality (#3)
Browse files Browse the repository at this point in the history
Addresses #2.
  • Loading branch information
dcarbone authored Mar 5, 2024
2 parents 01ca565 + 2d76b54 commit 2db97ab
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
File renamed without changes.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ No recipient will prevent or even delay an event from being pushed to other reci
force event recipients to be as expedient as possible in its implementation, as the rest of the Bus won't
wait for it to handle one event before sending another.

### Global subscription
For simplicity's sake, you either are listening or you aren't. If you don't care about a topic, ignore the message.
### Filtering
When registering a recipient, you may optionally provide a list of string's of specific topics or `*regex.Regexp`
instances. Your recipient will then only receive events that pass through those filters.

### No external deps
Only use stdlib modules
Expand Down
34 changes: 25 additions & 9 deletions seb.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,37 @@ func New(opts ...BusOpt) *Bus {
return &b
}

// Push will immediately send a new event to all currently registered recipients
// Push will immediately send a new event to all currently registered recipients, blocking until completed.
func (b *Bus) Push(ctx context.Context, topic string, data any) error {
return b.sendEvent(b.buildEvent(ctx, topic, data))
}

// PushTo attempts to push an even to a specific recipient
// PushAsync pushes an event to all recipients without blocking the caller. You amy optionally provide errc if you
// wish know about any / all errors that occurred during the push. Otherwise, set errc to nil.
func (b *Bus) PushAsync(ctx context.Context, topic string, data any, errc chan<- error) {
ev := b.buildEvent(ctx, topic, data)
if errc == nil {
go func() { _ = b.sendEvent(ev) }()
} else {
go func() { errc <- b.sendEvent(ev) }()
}
}

// PushTo attempts to push an even to a specific recipient, blocking until completed.
func (b *Bus) PushTo(ctx context.Context, to, topic string, data any) error {
return b.sendEventTo(to, b.buildEvent(ctx, topic, data))
}

// PushToAsync attempts to push an event to a specific recipient without blocking the caller.
func (b *Bus) PushToAsync(ctx context.Context, to, topic string, data any, errc chan<- error) {
ev := b.buildEvent(ctx, topic, data)
if errc == nil {
go func() { _ = b.sendEventTo(to, ev) }()
} else {
go func() { errc <- b.sendEventTo(to, ev) }()
}
}

// Request will push a new event with the Reply chan defined, blocking until a single response has been received
// or the provided context expires
func (b *Bus) Request(ctx context.Context, topic string, data any) (Reply, error) {
Expand Down Expand Up @@ -467,15 +488,10 @@ func (b *Bus) sendEvent(ev Event) error {

func (b *Bus) sendEventTo(to string, ev Event) error {
b.mu.Lock()

if w, ok := b.recipients[to]; ok {
b.mu.Unlock()

errCh := make(chan error, 1)
defer close(errCh)

go func() { errCh <- w.push(ev) }()

return <-errCh
return w.push(ev)
}

b.mu.Unlock()
Expand Down

0 comments on commit 2db97ab

Please sign in to comment.