diff --git a/README.md b/README.md index 32173fa..2c90067 100644 --- a/README.md +++ b/README.md @@ -72,11 +72,25 @@ return [ /** * By default Aggregate classes are mapped to a category name based on their - * class name. Example AccountAggregate would be published to an account - * stream. This allows you to implicitly map classes to categories. + * class name. Example App\Aggregates\AccountAggregate would be published + * to an account-uuid stream. This allows you to implicitly map classes + * to categories so that it could be published to account_v2-uuid. */ 'aggregate_category_map' => [], + /** + * If not using aggregates, events need to mapped to streams to be + * published. An example would be the AccoutCreated event + * could be published on to the accounts stream. + */ + 'event_stream_map' => [], + + /** + * If the event is not mapped to a stream, + * publish to this stream by default. + */ + 'default_stream' => env('EVENTSTORE_DEFAULT_STREAM', 'events'), + /** * The stream to listen to when replaying all events. Instead of using * $all, it is recommended to setup a project which emits events @@ -101,7 +115,7 @@ return [ * Laravel Event Sourcing package. You can customise the class to include your * own business logic. It should extend DigitalRisks\Lese\Lese */ - 'lese_class' => env('EVENTSTORE_LESE_CLASS', App\EventStore\Lese::class), + 'lese_class' => env('EVENTSTORE_LESE_CLASS', DigitalRisks\Lese\Lese::class), ]; ``` @@ -245,6 +259,24 @@ php artisan event-sourcing:replay App\\Projectors\\AccountsProjector Learn more how to use Event Sourcing by following the guides at https://docs.spatie.be/laravel-event-sourcing/v3/introduction/ +## Aggregates + +> If you're not using aggregates, you can skip this section. + +In order for the EventStore repositories to fetch the events and/or snapshots related to an aggregate, it needs to know about the aggregate. To do this we simply override the two methods below to initiate the repostiory and pass in the aggregate. + +```php +protected function getStoredEventRepository(): StoredEventRepository +{ + return resolve(EventStoreStoredEventRepository::class, ['aggregate' => $this]); +} + +protected function getSnapshotRepository(): SnapshotRepository +{ + return resolve(EventStoreSnapshotRepository::class, ['aggregate' => $this]); +} +``` + ## Subscribing to Streams The package also includes a long-running process, similar to [Pub / Sub](https://laravel.com/docs/7.x/redis#pubsub) with `php artisan redis:subscribe` whereby you can listen to events from a stream. diff --git a/config/lese.php b/config/lese.php index 75d63ce..56e8555 100644 --- a/config/lese.php +++ b/config/lese.php @@ -14,7 +14,8 @@ 'http_url' => env('EVENTSTORE_HTTP_URL', 'http://admin:changeit@localhost:2113'), /** - * When subscribing to other strea + * Listen to these streams when running `event-sourcing:subscribe`. Uses + * a comma delimetered list from the environment as default. */ 'subscription_streams' => array_filter(explode(',', env('EVENTSTORE_SUBSCRIPTION_STREAMS'))), @@ -24,27 +25,47 @@ 'group' => env('EVENTSTORE_SUBSCRIPTION_GROUP', env('APP_NAME', 'laravel')), /** - * + * By default Aggregate classes are mapped to a category name based on their + * class name. Example App\Aggregates\AccountAggregate would be published + * to an account-uuid stream. This allows you to implicitly map classes + * to categories so that it could be published to account_v2-uuid. */ 'aggregate_category_map' => [], /** - * + * If not using aggregates, events need to be mapped to streams so they can be + * published. For example mapping App\Events\AccountCreated to accounts. + */ + 'event_stream_map' => [], + + /** + * If the event is not mapped to a stream, publish to this stream by default. + */ + 'default_stream' => env('EVENTSTORE_DEFAULT_STREAM', 'events'), + + /** + * The stream to listen to when replaying all events. Instead of using + * $all, it is recommended to setup a project which emits events + * from various streams into a stream specific for your app. */ 'all' => env('EVENTSTORE_ALL', '$all'), /** - * + * Number of events to read in a single API + * call when reconstituting events. */ 'read_size' => env('EVENTSTORE_READ_SIZE', 4096), /** - * + * Number of events to read in a single TCP + * message when replaying all events. */ 'batch_size' => env('EVENTSTORE_BATCH_SIZE', 4096), /** - * + * This class contains a few callbacks to govern the bridge between EventStore and the + * Laravel Event Sourcing package. You can customise the class to include your + * own business logic. It should extend DigitalRisks\Lese\Lese */ 'lese_class' => env('EVENTSTORE_LESE_CLASS', DigitalRisks\Lese\Lese::class), ]; diff --git a/src/EventStoreStoredEventRepository.php b/src/EventStoreStoredEventRepository.php index bc06013..33a0fab 100644 --- a/src/EventStoreStoredEventRepository.php +++ b/src/EventStoreStoredEventRepository.php @@ -107,35 +107,48 @@ public function countAllStartingFrom(int $startingFrom, string $uuid = null): in public function persist(ShouldBeStored $event, string $uuid = null, int $aggregateVersion = null): StoredEvent { - return $this->persistMany([$event], $uuid, $aggregateVersion)[0]; + return $this->persistMany([$event], $uuid, $aggregateVersion)->first(); } public function persistMany(array $events, string $uuid = null, int $aggregateVersion = null): LazyCollection { - $transformedEvents = collect($events)->map(function ($event) use ($uuid) { + // Submit to EventStore + $byStream = collect($events)->groupBy(function (ShouldBeStored $event) use ($uuid) { + return $this->aggregate ? $this->lese->aggregateToStream($this->aggregate, $uuid) : $this->lese->eventToStream($event); + }); + + foreach ($byStream as $stream => $events) + { + $dataEvents = $events->map(function (ShouldBeStored $event) { + $json = app(EventSerializer::class)->serialize(clone $event); + $metadata = $event instanceof HasMetaData ? json_encode($event->collectMetaData()) : '{}'; + + return new EventData(EventId::generate(), get_class($event), true, $json, $metadata); + }); + + $this->eventstore->appendToStream( + $stream, + ExpectedVersion::ANY, + $dataEvents->toArray(), + ); + } + + // Map to StoredEvents + $storedEvents = $events->map(function (ShouldBeStored $event) use ($uuid, $aggregateVersion) { $json = app(EventSerializer::class)->serialize(clone $event); $metadata = $event instanceof HasMetaData ? json_encode($event->collectMetaData()) : '{}'; $metaModel = new StubModel(['meta_data' => $metadata ?: null]); - return [ - 'data' => new EventData(EventId::generate(), get_class($event), true, $json, $metadata), - 'stored' => new StoredEvent([ - 'event_properties' => $json, - 'aggregate_uuid' => $uuid, - 'event_class' => get_class($event), - 'meta_data' => new SchemalessAttributes($metaModel, 'meta_data'), - 'created_at' => Carbon::now(), - ]) - ]; + return new StoredEvent([ + 'event_properties' => $json, + 'aggregate_uuid' => $uuid ?? '', + 'event_class' => get_class($event), + 'meta_data' => new SchemalessAttributes($metaModel, 'meta_data'), + 'created_at' => Carbon::now(), + ]); }); - $this->eventstore->appendToStream( - $this->lese->aggregateToStream($this->aggregate, $uuid), - ExpectedVersion::ANY, - $transformedEvents->pluck('data')->toArray(), - ); - - return new LazyCollection($transformedEvents->pluck('stored')); + return new LazyCollection($storedEvents); } public function update(StoredEvent $storedEvent): StoredEvent diff --git a/src/Lese.php b/src/Lese.php index 2538e61..958d552 100644 --- a/src/Lese.php +++ b/src/Lese.php @@ -13,6 +13,7 @@ use Spatie\EventSourcing\StoredEvent; use Spatie\SchemalessAttributes\SchemalessAttributes; use Prooph\EventStore\ResolvedEvent; +use Spatie\EventSourcing\ShouldBeStored; class Lese { @@ -76,6 +77,17 @@ public function aggregateToStream(AggregateRoot $aggregate, string $uuid) return $this->aggregateToCategory($aggregate) . '-' . $uuid; } + public function eventToStream(ShouldBeStored $event) + { + $class = get_class($event); + + if ($map = config('lese.event_category_map.' . $class)) { + return $map; + } + + return config('lese.default_stream'); + } + public function aggregateToSnapshotStream(AggregateRoot $aggregate, string $uuid) { return '$' . $this->aggregateToCategory($aggregate) . '-' . $uuid . '-snapshot'; @@ -90,7 +102,7 @@ public function aggregateToCategory(AggregateRoot $aggregate) } $base = class_basename($class); - $category = Str::replaceLast('Aggregate', '', $base); + $category = Str::before($base, 'Aggregate'); return Str::snake($category); }