Skip to content

Commit

Permalink
introduce message loader in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Oct 15, 2024
1 parent a2474e5 commit bae4966
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 86 deletions.
32 changes: 32 additions & 0 deletions src/Subscription/Engine/DefaultMessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscription;

final class DefaultMessageLoader implements MessageLoader
{
public function __construct(
private readonly Store $store,
) {
}

/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream
{
return $this->store->load(new Criteria(new FromIndexCriterion($startIndex)));
}

public function lastIndex(): int
{
$stream = $this->store->load(null, 1, null, true);

return $stream->index() ?: 0;
}
}
88 changes: 13 additions & 75 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
Expand All @@ -17,15 +13,13 @@
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Psr\Log\LoggerInterface;
use Throwable;

use function array_keys;
use function count;
use function in_array;
use function sprintf;
Expand All @@ -39,14 +33,21 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine
/** @var array<string, BatchableSubscriber> */
private array $batching = [];

private readonly MessageLoader $messageLoader;

public function __construct(
private readonly Store $messageStore,
Store|MessageLoader $messageStore,
SubscriptionStore $subscriptionStore,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
private readonly EventMetadataFactory|null $eventMetadataFactory = null,
) {
if ($messageStore instanceof MessageLoader) {
$this->messageLoader = $messageStore;
} else {
$this->messageLoader = new DefaultMessageLoader($messageStore);
}

$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
}

Expand Down Expand Up @@ -77,7 +78,7 @@ function (array $subscriptions) use ($skipBooting): Result {
/** @var list<Error> $errors */
$errors = [];

$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
Expand Down Expand Up @@ -198,15 +199,7 @@ function ($subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events) {
$criteria = $criteria->add(new EventsCriterion($events));
}

$stream = $this->messageStore->load($criteria);
$stream = $this->messageLoader->load($startIndex, $subscriptions);

foreach ($stream as $message) {
$index = $stream->index();
Expand Down Expand Up @@ -373,15 +366,7 @@ function (array $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events) {
$criteria = $criteria->add(new EventsCriterion($events));
}

$stream = $this->messageStore->load($criteria);
$stream = $this->messageLoader->load($startIndex, $subscriptions);

foreach ($stream as $message) {
$index = $stream->index();
Expand Down Expand Up @@ -964,7 +949,7 @@ function (array $subscriptions): void {

if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) {
if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();
}

$subscription->changePosition($latestIndex);
Expand All @@ -984,13 +969,6 @@ function (array $subscriptions): void {
);
}

private function latestIndex(): int
{
$stream = $this->messageStore->load(null, 1, null, true);

return $stream->index() ?: 0;
}

/** @param list<Subscription> $subscriptions */
private function lowestSubscriptionPosition(array $subscriptions): int
{
Expand Down Expand Up @@ -1131,44 +1109,4 @@ private function shouldCommitBatch(Subscription $subscription): bool

return $this->batching[$subscription->id()]->forceCommit();
}

/**
* @param list<Subscription> $subscriptions
*
* @return list<string>
*/
private function events(array $subscriptions): array
{
if ($this->eventMetadataFactory === null) {
return [];
}

$eventNames = [];

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
return [];
}

$events = $subscriber->events();

foreach ($events as $event) {
if ($event === '*') {
return [];
}

$metadata = $this->eventMetadataFactory->metadata($event);

$eventNames[$metadata->name] = true;

foreach ($metadata->aliases as $alias) {
$eventNames[$alias] = true;
}
}
}

return array_keys($eventNames);
}
}
84 changes: 84 additions & 0 deletions src/Subscription/Engine/EventFilteredMessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;

use function array_keys;

final class EventFilteredMessageLoader implements MessageLoader
{
public function __construct(
private readonly Store $store,
private readonly EventMetadataFactory $eventMetadataFactory,

Check failure on line 23 in src/Subscription/Engine/EventFilteredMessageLoader.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Deptrac (locked, 8.3, ubuntu-latest)

Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader must not depend on Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory (Subscription on MetadataEvent)
private readonly SubscriberAccessorRepository $subscriberRepository,
) {
}

/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream
{
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events !== []) {
$criteria = $criteria->add(new EventsCriterion($events));
}

return $this->store->load($criteria);
}

/**
* @param list<Subscription> $subscriptions
*
* @return list<string>
*/
private function events(array $subscriptions): array
{
$eventNames = [];

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriberRepository->get($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
return [];
}

$events = $subscriber->events();

foreach ($events as $event) {
if ($event === '*') {
return [];
}

$metadata = $this->eventMetadataFactory->metadata($event);

$eventNames[$metadata->name] = true;

foreach ($metadata->aliases as $alias) {
$eventNames[$alias] = true;
}
}
}

return array_keys($eventNames);
}

public function lastIndex(): int
{
$stream = $this->store->load(null, 1, null, true);

return $stream->index() ?: 0;
}
}
16 changes: 16 additions & 0 deletions src/Subscription/Engine/MessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscription;

interface MessageLoader
{
/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream;

public function lastIndex(): int;
}
22 changes: 14 additions & 8 deletions tests/Benchmark/SubscriptionEngineBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
Expand Down Expand Up @@ -70,16 +71,21 @@ public function setUp(): void

$this->repository->save($profile);

$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository(
[
new ProfileProjector($connection),
new SendEmailProcessor(),
],
);

$this->subscriptionEngine = new DefaultSubscriptionEngine(
$this->store,
$subscriptionStore,
new MetadataSubscriberAccessorRepository(
[
new ProfileProjector($connection),
new SendEmailProcessor(),
],
new EventFilteredMessageLoader(
$this->store,
new AttributeEventMetadataFactory(),
$subscriberAccessorRepository,
),
eventMetadataFactory: new AttributeEventMetadataFactory(),
$subscriptionStore,
$subscriberAccessorRepository,
);
}

Expand Down
8 changes: 5 additions & 3 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RunMode;
Expand Down Expand Up @@ -94,11 +95,12 @@ public function testHappyPath(): void

$schemaDirector->create();

$subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]);

$engine = new DefaultSubscriptionEngine(
$store,
new EventFilteredMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository),
$subscriptionStore,
new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]),
eventMetadataFactory: new AttributeEventMetadataFactory(),
$subscriberRepository,
);

self::assertEquals(
Expand Down

0 comments on commit bae4966

Please sign in to comment.