Skip to content

Commit

Permalink
refactor: start to decomission events dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
opicaud committed Jan 4, 2024
1 parent 79200b4 commit 5a8c7fd
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 0 deletions.
66 changes: 66 additions & 0 deletions cqrs/pkg/v3beta1/cqrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package pkg

import (
"context"
)

type Command[T interface{}] interface {
Execute(apply T) ([]DomainEvent, error)
}

type CommandHandler[K Command[T], T interface{}] interface {
Execute(ctx context.Context, command K, commandApplier T) (context.Context, error)
}

type CommandHandlerImpl[K Command[T], T any] struct {
eventStore EventStore
eventsEmitter EventsEmitter
}

func (f *CommandHandlerImpl[K, T]) Execute(ctx context.Context, command K, applier T) (context.Context, error) {
events, err := command.Execute(applier)
ctx = f.eventsEmitter.NotifyAll(ctx, events...)
return ctx, err
}

type CommandHandlerBuilder[T interface{}] struct {
subscriber Subscriber
eventStoreSubscriber EventStoreSubscriber
eventsEmitter EventsEmitter
}

func (s *CommandHandlerBuilder[T]) WithSubscriber(subscriber Subscriber) *CommandHandlerBuilder[T] {
s.subscriber = subscriber
return s
}

func (s *CommandHandlerBuilder[T]) Build() CommandHandler[Command[T], T] {
commandHandler := new(CommandHandlerImpl[Command[T], T])
commandHandler.eventsEmitter = s.eventsEmitter
s.eventsEmitter.Add(s.subscriber)
s.eventsEmitter.Add(s.eventStoreSubscriber)
return commandHandler
}

func (s *CommandHandlerBuilder[T]) WithEventStore(store EventStore) *CommandHandlerBuilder[T] {
s.eventStoreSubscriber = EventStoreSubscriber{eventStore: store}
return s
}

func (s *CommandHandlerBuilder[T]) WithEventsEmitter(emitter *StandardEventsEmitter) *CommandHandlerBuilder[T] {
s.eventsEmitter = emitter
return s
}

type EventStoreSubscriber struct {
eventStore EventStore
}

func (e EventStoreSubscriber) Update(ctx context.Context, eventsChn chan []DomainEvent) context.Context {
events := <-eventsChn
ctx, _, err := e.eventStore.Save(ctx, events...)
if err != nil {
panic(err)
}
return ctx
}
80 changes: 80 additions & 0 deletions cqrs/pkg/v3beta1/cqrs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pkg

import (
"context"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/health/grpc_health_v1"
"testing"
)

func TestExecuteACommand(t *testing.T) {
subscriber := &FakeSubscriber{}
subscriber.mock.On("Update", nil).Return()
c := CommandHandlerBuilder[FakeCommandApplier]{}
f := FakeCommand[FakeCommandApplier]{}
v := FakeCommandApplier{}

t.Run("v2", v2(c, f, v))

}

func v2(c CommandHandlerBuilder[FakeCommandApplier], f FakeCommand[FakeCommandApplier], v FakeCommandApplier) func(t *testing.T) {
return func(t *testing.T) {
ctx := context.Background()
eventStore := &FakeEventStore{}
eventsEmitter := &StandardEventsEmitter{}
eventStore.mock.On("Save", nil).Return()
subscriber := &FakeSubscriber{}
subscriber.mock.On("Update", nil).Return()
commandHandler := c.WithEventStore(eventStore).
WithEventsEmitter(eventsEmitter).
WithSubscriber(subscriber).
Build()
ctx, err := commandHandler.Execute(ctx, f, v)

assert.NoError(t, err)

eventStore.mock.AssertCalled(t, "Save", nil)
subscriber.mock.AssertCalled(t, "Update", nil)

}
}

type FakeCommandApplier struct{}

type FakeCommand[T FakeCommandApplier] struct{}

func (f FakeCommand[T]) Execute(apply T) ([]DomainEvent, error) {
return nil, nil
}

type FakeEventStore struct {
mock mock.Mock
}

func (f *FakeEventStore) Save(ctx context.Context, events ...DomainEvent) (context.Context, []DomainEvent, error) {
f.mock.Called(nil)
return ctx, events, nil
}

func (f *FakeEventStore) Load(ctx context.Context, id uuid.UUID) (context.Context, []DomainEvent, error) {
panic("implement me")
}

func (f *FakeEventStore) Remove(ctx context.Context, uuid uuid.UUID) (context.Context, error) {
return ctx, nil
}
func (f *FakeEventStore) GetHealthClient(ctx context.Context) (context.Context, grpc_health_v1.HealthClient) {
return ctx, nil
}

type FakeSubscriber struct {
mock mock.Mock
}

func (g *FakeSubscriber) Update(ctx context.Context, eventsChn chan []DomainEvent) context.Context {
g.mock.Called(nil)
return ctx
}
9 changes: 9 additions & 0 deletions cqrs/pkg/v3beta1/domainevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pkg

import "github.com/google/uuid"

type DomainEvent interface {
AggregateId() uuid.UUID
Name() string
Data() []byte
}
14 changes: 14 additions & 0 deletions cqrs/pkg/v3beta1/eventstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pkg

import (
"context"
"github.com/google/uuid"
"google.golang.org/grpc/health/grpc_health_v1"
)

type EventStore interface {
Save(ctx context.Context, events ...DomainEvent) (context.Context, []DomainEvent, error)
Load(ctx context.Context, id uuid.UUID) (context.Context, []DomainEvent, error)
Remove(ctx context.Context, uuid uuid.UUID) (context.Context, error)
GetHealthClient(ctx context.Context) (context.Context, grpc_health_v1.HealthClient)
}
45 changes: 45 additions & 0 deletions cqrs/pkg/v3beta1/standard_events_emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pkg

import (
"context"
"sync"
)

type StandardEventsEmitter struct {
subscribers []Subscriber
}

func (s *StandardEventsEmitter) NotifyAll(ctx context.Context, event ...DomainEvent) context.Context {
eventsChn := make(chan []DomainEvent, len(s.subscribers))
var wg sync.WaitGroup
wg.Add(1)
go func() {
for range s.subscribers {
eventsChn <- event
}
}()
go func() {
for _, subscriber := range s.subscribers {
if subscriber != nil {
ctx = subscriber.Update(ctx, eventsChn)
}
}
wg.Done()
}()
wg.Wait()
close(eventsChn)
return ctx
}

func (s *StandardEventsEmitter) Add(subscriber Subscriber) {
s.subscribers = append(s.subscribers, subscriber)
}

type EventsEmitter interface {
NotifyAll(ctx context.Context, event ...DomainEvent) context.Context
Add(subscriber Subscriber)
}

type Subscriber interface {
Update(ctx context.Context, eventsChn chan []DomainEvent) context.Context
}
49 changes: 49 additions & 0 deletions cqrs/pkg/v3beta1/standard_events_emitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pkg

import (
"context"
"github.com/google/uuid"
"github.com/opicaud/monorepo/events/pkg"
"github.com/stretchr/testify/assert"
"testing"
)

func TestShouldUseStandardEmitter(t *testing.T) {
ctx := context.Background()
emitter := StandardEventsEmitter{}
subscriber := SubscriberForTest{}
emitter.Add(&subscriber)
emitter.Add(&SubscriberForTest{})
ctx = emitter.NotifyAll(ctx, StandardEvent{aggregateId: uuid.New(), name: "test"})

assert.Len(t, subscriber.eventsFromUpdate, 1)
assert.Equal(t, subscriber.eventsFromUpdate[0].Name(), "test")

}

type SubscriberForTest struct {
eventsFromUpdate []pkg.DomainEvent
}

func (s *SubscriberForTest) Update(ctx context.Context, eventsChn chan []pkg.DomainEvent) context.Context {
s.eventsFromUpdate = <-eventsChn
return ctx
}

type StandardEvent struct {
aggregateId uuid.UUID
name string
data []byte
}

func (s StandardEvent) AggregateId() uuid.UUID {
return s.aggregateId
}

func (s StandardEvent) Name() string {
return s.name
}

func (s StandardEvent) Data() []byte {
return nil
}

0 comments on commit 5a8c7fd

Please sign in to comment.