From a2474e510b98654b449a2adf3c267d283a0401b5 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 6 Oct 2024 10:13:14 +0200 Subject: [PATCH] filter events in subscription engine --- src/Store/Criteria/EventsCriterion.php | 14 ++++ src/Store/DoctrineDbalStore.php | 6 ++ src/Store/StreamDoctrineDbalStore.php | 6 ++ .../Engine/DefaultSubscriptionEngine.php | 64 ++++++++++++++++++- .../Subscriber/MetadataSubscriberAccessor.php | 7 ++ tests/Benchmark/SubscriptionEngineBench.php | 3 + .../Subscription/SubscriptionTest.php | 2 + 7 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 src/Store/Criteria/EventsCriterion.php diff --git a/src/Store/Criteria/EventsCriterion.php b/src/Store/Criteria/EventsCriterion.php new file mode 100644 index 00000000..310d7461 --- /dev/null +++ b/src/Store/Criteria/EventsCriterion.php @@ -0,0 +1,14 @@ + */ + public readonly array $events, + ) { + } +} diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 2a410427..1333dbac 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Store; use Closure; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\MariaDBPlatform; @@ -26,6 +27,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use PDO; @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void $builder->andWhere('id > :index'); $builder->setParameter('index', $criterion->fromIndex, Types::INTEGER); break; + case EventsCriterion::class: + $builder->andWhere('event IN (:events)'); + $builder->setParameter('events', $criterion->events, ArrayParameterType::STRING); + break; default: throw new UnsupportedCriterion($criterion::class); } diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index d9c1cd16..cd95fd27 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Store; use Closure; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\MariaDBPlatform; @@ -24,6 +25,7 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void $builder->andWhere('id > :index'); $builder->setParameter('index', $criterion->fromIndex, Types::INTEGER); break; + case EventsCriterion::class: + $builder->andWhere('event IN (:events)'); + $builder->setParameter('events', $criterion->events, ArrayParameterType::STRING); + break; default: throw new UnsupportedCriterion($criterion::class); } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index e4ffbd33..2f7ca666 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -5,7 +5,9 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; @@ -15,6 +17,7 @@ use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; +use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; @@ -22,6 +25,7 @@ use Psr\Log\LoggerInterface; use Throwable; +use function array_keys; use function count; use function in_array; use function sprintf; @@ -41,6 +45,7 @@ public function __construct( private readonly SubscriberAccessorRepository $subscriberRepository, private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly LoggerInterface|null $logger = null, + private readonly EventMetadataFactory|null $eventMetadataFactory = null, ) { $this->subscriptionManager = new SubscriptionManager($subscriptionStore); } @@ -193,9 +198,15 @@ function ($subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $stream = $this->messageStore->load( - new Criteria(new FromIndexCriterion($startIndex)), - ); + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + + $stream = $this->messageStore->load($criteria); foreach ($stream as $message) { $index = $stream->index(); @@ -363,6 +374,13 @@ function (array $subscriptions) use ($limit): ProcessedResult { try { $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + $stream = $this->messageStore->load($criteria); foreach ($stream as $message) { @@ -1113,4 +1131,44 @@ private function shouldCommitBatch(Subscription $subscription): bool return $this->batching[$subscription->id()]->forceCommit(); } + + /** + * @param list $subscriptions + * + * @return list + */ + private function events(array $subscriptions): array + { + if ($this->eventMetadataFactory === null) { + return []; + } + + $eventNames = []; + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriber($subscription->id()); + + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return []; + } + + $events = $subscriber->events(); + + foreach ($events as $event) { + if ($event === '*') { + return []; + } + + $metadata = $this->eventMetadataFactory->metadata($event); + + $eventNames[$metadata->name] = true; + + foreach ($metadata->aliases as $alias) { + $eventNames[$alias] = true; + } + } + } + + return array_keys($eventNames); + } } diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php index a97ac8a5..2a1771b7 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; use function array_key_exists; +use function array_keys; use function array_map; use function array_merge; @@ -66,6 +67,12 @@ public function teardownMethod(): Closure|null return $this->subscriber->$method(...); } + /** @return list */ + public function events(): array + { + return array_keys($this->metadata->subscribeMethods); + } + /** * @param class-string $eventClass * diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index fe5fdfde..2dc0631d 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; +use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; @@ -63,6 +64,7 @@ public function setUp(): void $profile = Profile::create($this->id, 'Peter'); for ($i = 1; $i < 10_000; $i++) { + $profile->changeEmail('peter' . $i . '@example.com'); $profile->changeName('Peter ' . $i); } @@ -77,6 +79,7 @@ public function setUp(): void new SendEmailProcessor(), ], ), + eventMetadataFactory: new AttributeEventMetadataFactory(), ); } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 08af8133..56db3700 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -17,6 +17,7 @@ use Patchlevel\EventSourcing\Debug\Trace\TraceStack; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -97,6 +98,7 @@ public function testHappyPath(): void $store, $subscriptionStore, new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), + eventMetadataFactory: new AttributeEventMetadataFactory(), ); self::assertEquals(