diff --git a/cqrs/pkg/v3beta1/cqrs.go b/cqrs/pkg/v3beta1/cqrs.go new file mode 100644 index 00000000..933a3fd8 --- /dev/null +++ b/cqrs/pkg/v3beta1/cqrs.go @@ -0,0 +1,66 @@ +package pkg + +import ( + "context" +) + +type Command[T interface{}] interface { + Execute(apply T) ([]DomainEvent, error) +} + +type CommandHandler[K Command[T], T interface{}] interface { + Execute(ctx context.Context, command K, commandApplier T) (context.Context, error) +} + +type CommandHandlerImpl[K Command[T], T any] struct { + eventStore EventStore + eventsEmitter EventsEmitter +} + +func (f *CommandHandlerImpl[K, T]) Execute(ctx context.Context, command K, applier T) (context.Context, error) { + events, err := command.Execute(applier) + ctx = f.eventsEmitter.NotifyAll(ctx, events...) + return ctx, err +} + +type CommandHandlerBuilder[T interface{}] struct { + subscriber Subscriber + eventStoreSubscriber EventStoreSubscriber + eventsEmitter EventsEmitter +} + +func (s *CommandHandlerBuilder[T]) WithSubscriber(subscriber Subscriber) *CommandHandlerBuilder[T] { + s.subscriber = subscriber + return s +} + +func (s *CommandHandlerBuilder[T]) Build() CommandHandler[Command[T], T] { + commandHandler := new(CommandHandlerImpl[Command[T], T]) + commandHandler.eventsEmitter = s.eventsEmitter + s.eventsEmitter.Add(s.subscriber) + s.eventsEmitter.Add(s.eventStoreSubscriber) + return commandHandler +} + +func (s *CommandHandlerBuilder[T]) WithEventStore(store EventStore) *CommandHandlerBuilder[T] { + s.eventStoreSubscriber = EventStoreSubscriber{eventStore: store} + return s +} + +func (s *CommandHandlerBuilder[T]) WithEventsEmitter(emitter *StandardEventsEmitter) *CommandHandlerBuilder[T] { + s.eventsEmitter = emitter + return s +} + +type EventStoreSubscriber struct { + eventStore EventStore +} + +func (e EventStoreSubscriber) Update(ctx context.Context, eventsChn chan []DomainEvent) context.Context { + events := <-eventsChn + ctx, _, err := e.eventStore.Save(ctx, events...) + if err != nil { + panic(err) + } + return ctx +} diff --git a/cqrs/pkg/v3beta1/cqrs_test.go b/cqrs/pkg/v3beta1/cqrs_test.go new file mode 100644 index 00000000..938e2094 --- /dev/null +++ b/cqrs/pkg/v3beta1/cqrs_test.go @@ -0,0 +1,80 @@ +package pkg + +import ( + "context" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/health/grpc_health_v1" + "testing" +) + +func TestExecuteACommand(t *testing.T) { + subscriber := &FakeSubscriber{} + subscriber.mock.On("Update", nil).Return() + c := CommandHandlerBuilder[FakeCommandApplier]{} + f := FakeCommand[FakeCommandApplier]{} + v := FakeCommandApplier{} + + t.Run("v2", v2(c, f, v)) + +} + +func v2(c CommandHandlerBuilder[FakeCommandApplier], f FakeCommand[FakeCommandApplier], v FakeCommandApplier) func(t *testing.T) { + return func(t *testing.T) { + ctx := context.Background() + eventStore := &FakeEventStore{} + eventsEmitter := &StandardEventsEmitter{} + eventStore.mock.On("Save", nil).Return() + subscriber := &FakeSubscriber{} + subscriber.mock.On("Update", nil).Return() + commandHandler := c.WithEventStore(eventStore). + WithEventsEmitter(eventsEmitter). + WithSubscriber(subscriber). + Build() + ctx, err := commandHandler.Execute(ctx, f, v) + + assert.NoError(t, err) + + eventStore.mock.AssertCalled(t, "Save", nil) + subscriber.mock.AssertCalled(t, "Update", nil) + + } +} + +type FakeCommandApplier struct{} + +type FakeCommand[T FakeCommandApplier] struct{} + +func (f FakeCommand[T]) Execute(apply T) ([]DomainEvent, error) { + return nil, nil +} + +type FakeEventStore struct { + mock mock.Mock +} + +func (f *FakeEventStore) Save(ctx context.Context, events ...DomainEvent) (context.Context, []DomainEvent, error) { + f.mock.Called(nil) + return ctx, events, nil +} + +func (f *FakeEventStore) Load(ctx context.Context, id uuid.UUID) (context.Context, []DomainEvent, error) { + panic("implement me") +} + +func (f *FakeEventStore) Remove(ctx context.Context, uuid uuid.UUID) (context.Context, error) { + return ctx, nil +} +func (f *FakeEventStore) GetHealthClient(ctx context.Context) (context.Context, grpc_health_v1.HealthClient) { + return ctx, nil +} + +type FakeSubscriber struct { + mock mock.Mock +} + +func (g *FakeSubscriber) Update(ctx context.Context, eventsChn chan []DomainEvent) context.Context { + g.mock.Called(nil) + return ctx +} diff --git a/cqrs/pkg/v3beta1/domainevent.go b/cqrs/pkg/v3beta1/domainevent.go new file mode 100644 index 00000000..174aa7c8 --- /dev/null +++ b/cqrs/pkg/v3beta1/domainevent.go @@ -0,0 +1,9 @@ +package pkg + +import "github.com/google/uuid" + +type DomainEvent interface { + AggregateId() uuid.UUID + Name() string + Data() []byte +} diff --git a/cqrs/pkg/v3beta1/eventstore.go b/cqrs/pkg/v3beta1/eventstore.go new file mode 100644 index 00000000..e6c314a0 --- /dev/null +++ b/cqrs/pkg/v3beta1/eventstore.go @@ -0,0 +1,14 @@ +package pkg + +import ( + "context" + "github.com/google/uuid" + "google.golang.org/grpc/health/grpc_health_v1" +) + +type EventStore interface { + Save(ctx context.Context, events ...DomainEvent) (context.Context, []DomainEvent, error) + Load(ctx context.Context, id uuid.UUID) (context.Context, []DomainEvent, error) + Remove(ctx context.Context, uuid uuid.UUID) (context.Context, error) + GetHealthClient(ctx context.Context) (context.Context, grpc_health_v1.HealthClient) +} diff --git a/cqrs/pkg/v3beta1/standard_events_emitter.go b/cqrs/pkg/v3beta1/standard_events_emitter.go new file mode 100644 index 00000000..149638a4 --- /dev/null +++ b/cqrs/pkg/v3beta1/standard_events_emitter.go @@ -0,0 +1,45 @@ +package pkg + +import ( + "context" + "sync" +) + +type StandardEventsEmitter struct { + subscribers []Subscriber +} + +func (s *StandardEventsEmitter) NotifyAll(ctx context.Context, event ...DomainEvent) context.Context { + eventsChn := make(chan []DomainEvent, len(s.subscribers)) + var wg sync.WaitGroup + wg.Add(1) + go func() { + for range s.subscribers { + eventsChn <- event + } + }() + go func() { + for _, subscriber := range s.subscribers { + if subscriber != nil { + ctx = subscriber.Update(ctx, eventsChn) + } + } + wg.Done() + }() + wg.Wait() + close(eventsChn) + return ctx +} + +func (s *StandardEventsEmitter) Add(subscriber Subscriber) { + s.subscribers = append(s.subscribers, subscriber) +} + +type EventsEmitter interface { + NotifyAll(ctx context.Context, event ...DomainEvent) context.Context + Add(subscriber Subscriber) +} + +type Subscriber interface { + Update(ctx context.Context, eventsChn chan []DomainEvent) context.Context +} diff --git a/cqrs/pkg/v3beta1/standard_events_emitter_test.go b/cqrs/pkg/v3beta1/standard_events_emitter_test.go new file mode 100644 index 00000000..a2a56340 --- /dev/null +++ b/cqrs/pkg/v3beta1/standard_events_emitter_test.go @@ -0,0 +1,49 @@ +package pkg + +import ( + "context" + "github.com/google/uuid" + "github.com/opicaud/monorepo/events/pkg" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestShouldUseStandardEmitter(t *testing.T) { + ctx := context.Background() + emitter := StandardEventsEmitter{} + subscriber := SubscriberForTest{} + emitter.Add(&subscriber) + emitter.Add(&SubscriberForTest{}) + ctx = emitter.NotifyAll(ctx, StandardEvent{aggregateId: uuid.New(), name: "test"}) + + assert.Len(t, subscriber.eventsFromUpdate, 1) + assert.Equal(t, subscriber.eventsFromUpdate[0].Name(), "test") + +} + +type SubscriberForTest struct { + eventsFromUpdate []pkg.DomainEvent +} + +func (s *SubscriberForTest) Update(ctx context.Context, eventsChn chan []pkg.DomainEvent) context.Context { + s.eventsFromUpdate = <-eventsChn + return ctx +} + +type StandardEvent struct { + aggregateId uuid.UUID + name string + data []byte +} + +func (s StandardEvent) AggregateId() uuid.UUID { + return s.aggregateId +} + +func (s StandardEvent) Name() string { + return s.name +} + +func (s StandardEvent) Data() []byte { + return nil +}