Skip to content

Commit

Permalink
feat: Support not using aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
morrislaptop committed May 20, 2020
1 parent 2e6ebf3 commit 61dbfbe
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 29 deletions.
38 changes: 35 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,25 @@ return [

/**
* By default Aggregate classes are mapped to a category name based on their
* class name. Example AccountAggregate would be published to an account
* stream. This allows you to implicitly map classes to categories.
* class name. Example App\Aggregates\AccountAggregate would be published
* to an account-uuid stream. This allows you to implicitly map classes
* to categories so that it could be published to account_v2-uuid.
*/
'aggregate_category_map' => [],

/**
* If not using aggregates, events need to mapped to streams to be
* published. An example would be the AccoutCreated event
* could be published on to the accounts stream.
*/
'event_stream_map' => [],

/**
* If the event is not mapped to a stream,
* publish to this stream by default.
*/
'default_stream' => env('EVENTSTORE_DEFAULT_STREAM', 'events'),

/**
* The stream to listen to when replaying all events. Instead of using
* $all, it is recommended to setup a project which emits events
Expand All @@ -101,7 +115,7 @@ return [
* Laravel Event Sourcing package. You can customise the class to include your
* own business logic. It should extend DigitalRisks\Lese\Lese
*/
'lese_class' => env('EVENTSTORE_LESE_CLASS', App\EventStore\Lese::class),
'lese_class' => env('EVENTSTORE_LESE_CLASS', DigitalRisks\Lese\Lese::class),
];
```

Expand Down Expand Up @@ -245,6 +259,24 @@ php artisan event-sourcing:replay App\\Projectors\\AccountsProjector

Learn more how to use Event Sourcing by following the guides at https://docs.spatie.be/laravel-event-sourcing/v3/introduction/

## Aggregates

> If you're not using aggregates, you can skip this section.
In order for the EventStore repositories to fetch the events and/or snapshots related to an aggregate, it needs to know about the aggregate. To do this we simply override the two methods below to initiate the repostiory and pass in the aggregate.

```php
protected function getStoredEventRepository(): StoredEventRepository
{
return resolve(EventStoreStoredEventRepository::class, ['aggregate' => $this]);
}

protected function getSnapshotRepository(): SnapshotRepository
{
return resolve(EventStoreSnapshotRepository::class, ['aggregate' => $this]);
}
```

## Subscribing to Streams

The package also includes a long-running process, similar to [Pub / Sub](https://laravel.com/docs/7.x/redis#pubsub) with `php artisan redis:subscribe` whereby you can listen to events from a stream.
Expand Down
33 changes: 27 additions & 6 deletions config/lese.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
'http_url' => env('EVENTSTORE_HTTP_URL', 'http://admin:changeit@localhost:2113'),

/**
* When subscribing to other strea
* Listen to these streams when running `event-sourcing:subscribe`. Uses
* a comma delimetered list from the environment as default.
*/
'subscription_streams' => array_filter(explode(',', env('EVENTSTORE_SUBSCRIPTION_STREAMS'))),

Expand All @@ -24,27 +25,47 @@
'group' => env('EVENTSTORE_SUBSCRIPTION_GROUP', env('APP_NAME', 'laravel')),

/**
*
* By default Aggregate classes are mapped to a category name based on their
* class name. Example App\Aggregates\AccountAggregate would be published
* to an account-uuid stream. This allows you to implicitly map classes
* to categories so that it could be published to account_v2-uuid.
*/
'aggregate_category_map' => [],

/**
*
* If not using aggregates, events need to be mapped to streams so they can be
* published. For example mapping App\Events\AccountCreated to accounts.
*/
'event_stream_map' => [],

/**
* If the event is not mapped to a stream, publish to this stream by default.
*/
'default_stream' => env('EVENTSTORE_DEFAULT_STREAM', 'events'),

/**
* The stream to listen to when replaying all events. Instead of using
* $all, it is recommended to setup a project which emits events
* from various streams into a stream specific for your app.
*/
'all' => env('EVENTSTORE_ALL', '$all'),

/**
*
* Number of events to read in a single API
* call when reconstituting events.
*/
'read_size' => env('EVENTSTORE_READ_SIZE', 4096),

/**
*
* Number of events to read in a single TCP
* message when replaying all events.
*/
'batch_size' => env('EVENTSTORE_BATCH_SIZE', 4096),

/**
*
* This class contains a few callbacks to govern the bridge between EventStore and the
* Laravel Event Sourcing package. You can customise the class to include your
* own business logic. It should extend DigitalRisks\Lese\Lese
*/
'lese_class' => env('EVENTSTORE_LESE_CLASS', DigitalRisks\Lese\Lese::class),
];
51 changes: 32 additions & 19 deletions src/EventStoreStoredEventRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,48 @@ public function countAllStartingFrom(int $startingFrom, string $uuid = null): in

public function persist(ShouldBeStored $event, string $uuid = null, int $aggregateVersion = null): StoredEvent
{
return $this->persistMany([$event], $uuid, $aggregateVersion)[0];
return $this->persistMany([$event], $uuid, $aggregateVersion)->first();
}

public function persistMany(array $events, string $uuid = null, int $aggregateVersion = null): LazyCollection
{
$transformedEvents = collect($events)->map(function ($event) use ($uuid) {
// Submit to EventStore
$byStream = collect($events)->groupBy(function (ShouldBeStored $event) use ($uuid) {
return $this->aggregate ? $this->lese->aggregateToStream($this->aggregate, $uuid) : $this->lese->eventToStream($event);
});

foreach ($byStream as $stream => $events)
{
$dataEvents = $events->map(function (ShouldBeStored $event) {
$json = app(EventSerializer::class)->serialize(clone $event);
$metadata = $event instanceof HasMetaData ? json_encode($event->collectMetaData()) : '{}';

return new EventData(EventId::generate(), get_class($event), true, $json, $metadata);
});

$this->eventstore->appendToStream(
$stream,
ExpectedVersion::ANY,
$dataEvents->toArray(),
);
}

// Map to StoredEvents
$storedEvents = $events->map(function (ShouldBeStored $event) use ($uuid, $aggregateVersion) {
$json = app(EventSerializer::class)->serialize(clone $event);
$metadata = $event instanceof HasMetaData ? json_encode($event->collectMetaData()) : '{}';
$metaModel = new StubModel(['meta_data' => $metadata ?: null]);

return [
'data' => new EventData(EventId::generate(), get_class($event), true, $json, $metadata),
'stored' => new StoredEvent([
'event_properties' => $json,
'aggregate_uuid' => $uuid,
'event_class' => get_class($event),
'meta_data' => new SchemalessAttributes($metaModel, 'meta_data'),
'created_at' => Carbon::now(),
])
];
return new StoredEvent([
'event_properties' => $json,
'aggregate_uuid' => $uuid ?? '',
'event_class' => get_class($event),
'meta_data' => new SchemalessAttributes($metaModel, 'meta_data'),
'created_at' => Carbon::now(),
]);
});

$this->eventstore->appendToStream(
$this->lese->aggregateToStream($this->aggregate, $uuid),
ExpectedVersion::ANY,
$transformedEvents->pluck('data')->toArray(),
);

return new LazyCollection($transformedEvents->pluck('stored'));
return new LazyCollection($storedEvents);
}

public function update(StoredEvent $storedEvent): StoredEvent
Expand Down
14 changes: 13 additions & 1 deletion src/Lese.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Spatie\EventSourcing\StoredEvent;
use Spatie\SchemalessAttributes\SchemalessAttributes;
use Prooph\EventStore\ResolvedEvent;
use Spatie\EventSourcing\ShouldBeStored;

class Lese
{
Expand Down Expand Up @@ -76,6 +77,17 @@ public function aggregateToStream(AggregateRoot $aggregate, string $uuid)
return $this->aggregateToCategory($aggregate) . '-' . $uuid;
}

public function eventToStream(ShouldBeStored $event)
{
$class = get_class($event);

if ($map = config('lese.event_category_map.' . $class)) {
return $map;
}

return config('lese.default_stream');
}

public function aggregateToSnapshotStream(AggregateRoot $aggregate, string $uuid)
{
return '$' . $this->aggregateToCategory($aggregate) . '-' . $uuid . '-snapshot';
Expand All @@ -90,7 +102,7 @@ public function aggregateToCategory(AggregateRoot $aggregate)
}

$base = class_basename($class);
$category = Str::replaceLast('Aggregate', '', $base);
$category = Str::before($base, 'Aggregate');

return Str::snake($category);
}
Expand Down

0 comments on commit 61dbfbe

Please sign in to comment.