Skip to content

Commit

Permalink
filter events in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Oct 15, 2024
1 parent 9c96e5a commit a2474e5
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 3 deletions.
14 changes: 14 additions & 0 deletions src/Store/Criteria/EventsCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class EventsCriterion
{
public function __construct(
/** @var list<string> */
public readonly array $events,
) {
}
}
6 changes: 6 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -26,6 +27,7 @@
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use PDO;
Expand Down Expand Up @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
6 changes: 6 additions & 0 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -24,6 +25,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
Expand Down Expand Up @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
64 changes: 61 additions & 3 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
Expand All @@ -15,13 +17,15 @@
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Psr\Log\LoggerInterface;
use Throwable;

use function array_keys;
use function count;
use function in_array;
use function sprintf;
Expand All @@ -41,6 +45,7 @@ public function __construct(
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
private readonly EventMetadataFactory|null $eventMetadataFactory = null,
) {
$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
}
Expand Down Expand Up @@ -193,9 +198,15 @@ function ($subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$stream = $this->messageStore->load(
new Criteria(new FromIndexCriterion($startIndex)),
);
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events) {
$criteria = $criteria->add(new EventsCriterion($events));
}

$stream = $this->messageStore->load($criteria);

foreach ($stream as $message) {
$index = $stream->index();
Expand Down Expand Up @@ -363,6 +374,13 @@ function (array $subscriptions) use ($limit): ProcessedResult {

try {
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events) {
$criteria = $criteria->add(new EventsCriterion($events));
}

$stream = $this->messageStore->load($criteria);

foreach ($stream as $message) {
Expand Down Expand Up @@ -1113,4 +1131,44 @@ private function shouldCommitBatch(Subscription $subscription): bool

return $this->batching[$subscription->id()]->forceCommit();
}

/**
* @param list<Subscription> $subscriptions
*
* @return list<string>
*/
private function events(array $subscriptions): array
{
if ($this->eventMetadataFactory === null) {
return [];
}

$eventNames = [];

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
return [];
}

$events = $subscriber->events();

foreach ($events as $event) {
if ($event === '*') {
return [];
}

$metadata = $this->eventMetadataFactory->metadata($event);

$eventNames[$metadata->name] = true;

foreach ($metadata->aliases as $alias) {
$eventNames[$alias] = true;
}
}
}

return array_keys($eventNames);
}
}
7 changes: 7 additions & 0 deletions src/Subscription/Subscriber/MetadataSubscriberAccessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver;

use function array_key_exists;
use function array_keys;
use function array_map;
use function array_merge;

Expand Down Expand Up @@ -66,6 +67,12 @@ public function teardownMethod(): Closure|null
return $this->subscriber->$method(...);
}

/** @return list<class-string|'*'> */
public function events(): array
{
return array_keys($this->metadata->subscribeMethods);
}

/**
* @param class-string $eventClass
*
Expand Down
3 changes: 3 additions & 0 deletions tests/Benchmark/SubscriptionEngineBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
Expand Down Expand Up @@ -63,6 +64,7 @@ public function setUp(): void
$profile = Profile::create($this->id, 'Peter');

for ($i = 1; $i < 10_000; $i++) {
$profile->changeEmail('peter' . $i . '@example.com');
$profile->changeName('Peter ' . $i);
}

Expand All @@ -77,6 +79,7 @@ public function setUp(): void
new SendEmailProcessor(),
],
),
eventMetadataFactory: new AttributeEventMetadataFactory(),
);
}

Expand Down
2 changes: 2 additions & 0 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Patchlevel\EventSourcing\Debug\Trace\TraceStack;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand Down Expand Up @@ -97,6 +98,7 @@ public function testHappyPath(): void
$store,
$subscriptionStore,
new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]),
eventMetadataFactory: new AttributeEventMetadataFactory(),
);

self::assertEquals(
Expand Down

0 comments on commit a2474e5

Please sign in to comment.