Skip to content

Commit

Permalink
Merge pull request #632 from patchlevel/batch-subscription
Browse files Browse the repository at this point in the history
add batch subscription
  • Loading branch information
DavidBadura authored Oct 15, 2024
2 parents fb72a65 + 1b79e7d commit 9c96e5a
Show file tree
Hide file tree
Showing 18 changed files with 1,868 additions and 348 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test: phpunit

.PHONY: benchmark
benchmark: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --report=default
DB_URL=sqlite3:///:memory: php -d memory_limit=512M vendor/bin/phpbench run tests/Benchmark --report=default

.PHONY: benchmark-base
benchmark-base: vendor ## run benchmarks
Expand Down
21 changes: 18 additions & 3 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@
<code><![CDATA[$store]]></code>
</MissingConstructor>
</file>
<file src="tests/Benchmark/SubscriptionEngineBatchBench.php">
<MissingConstructor>
<code><![CDATA[$id]]></code>
<code><![CDATA[$repository]]></code>
<code><![CDATA[$store]]></code>
<code><![CDATA[$subscriptionEngine]]></code>
</MissingConstructor>
</file>
<file src="tests/Benchmark/SubscriptionEngineBench.php">
<MissingConstructor>
<code><![CDATA[$id]]></code>
Expand Down Expand Up @@ -209,9 +217,6 @@
<MissingParamType>
<code><![CDATA[$message]]></code>
</MissingParamType>
<ReservedWord>
<code><![CDATA[ProfileVisited&ProfileCreated $event]]></code>
</ReservedWord>
</file>
<file src="tests/Unit/Schema/DoctrineSchemaSubscriberTest.php">
<DeprecatedClass>
Expand Down Expand Up @@ -324,6 +329,16 @@
</InternalMethod>
</file>
<file src="tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php">
<PossiblyNullArgument>
<code><![CDATA[$subscriber->throwForBeginBatch]]></code>
<code><![CDATA[$subscriber->throwForBeginBatch]]></code>
<code><![CDATA[$subscriber->throwForCommitBatch]]></code>
<code><![CDATA[$subscriber->throwForCommitBatch]]></code>
<code><![CDATA[$subscriber->throwForMessage]]></code>
<code><![CDATA[$subscriber->throwForMessage]]></code>
<code><![CDATA[$subscriber->throwForMessage]]></code>
<code><![CDATA[$subscriber->throwForMessage]]></code>
</PossiblyNullArgument>
<PossiblyUndefinedArrayOffset>
<code><![CDATA[$update1]]></code>
</PossiblyUndefinedArrayOffset>
Expand Down
91 changes: 91 additions & 0 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,97 @@ final class MigrationSubscriber
// ...
}
```
### Batching

You can also optimize the performance of your subscribers by processing a number of events in a batch.
This is particularly useful when projections need to be rebuilt.
To achieve this, you can implement the `BatchableSubscriber` interface.

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;

#[Projector('profile_1')]
final class MigrationSubscriber implements BatchableSubscriber
{
public function __construct(
private readonly Connection $connection,
) {
}

/** @var array<string, int> */
private array $nameChanged = [];

#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $event): void
{
$this->nameChanged[$event->userId] = $event->name;
}

public function beginBatch(): void
{
$this->nameChanged = [];
$this->connection->beginTransaction();
}

public function commitBatch(): void
{
foreach ($this->nameChanged as $userId => $name) {
$this->connection->executeStatement(
'UPDATE user SET name = :name WHERE id = :id',
['name' => $name, 'id' => $userId],
);
}

$this->connection->commit();
$this->nameChanged = [];
}

public function rollbackBatch(): void
{
$this->connection->rollBack();
}

public function forceCommit(): bool
{
return count($this->nameChanged) > 1000;
}
}
```
This interface provides you with all the options you need to process your data collectively.

The `beginBatch` method is called as soon as a subscriber wants to process an event.
If no suitable event is found in the stream, batching will not start, and this method will not be called.
Here, you can make all necessary preparations, such as opening a transaction or preparing variables.

The `commitBatch` method is called when batching was previously started, and one of the following conditions is met:
Either the Subscription Engine reaches its limit, or the stream is finished.
Alternatively, if the subscriber explicitly indicates using the `forceCommit` method that they want to process the data now.
At this step, you must process all the data.

The `rollbackBatch` method is called when an error occurs and the batching needs to be aborted.
Here, you can respond to the error and potentially perform a database rollback.

The method `forceCommit` is called after each handled event,
and you can decide whether the batch commit process should start now.
This helps to determine the batch size and thus avoid memory overflow.

!!! danger

Make sure to fully process the data in `commitBatch` and close any open transactions.
Otherwise, it may lead to inconsistent data.

!!! note

The position of the subscriber is only updated after a successful commit.
In case of an error, the position remains at the state before the batch started.

!!! tip

Use `forceCommit` to prevent memory leaks.
This allows you to decide when it's suitable to process the data and then release the memory.

## Subscription Engine

The subscription engine manages individual subscribers and keeps the subscriptions running.
Expand Down
20 changes: 10 additions & 10 deletions src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public function metadata(string $subscriber): SubscriberMetadata
$methods = $reflector->getMethods();

$subscribeMethods = [];
$createMethod = null;
$dropMethod = null;
$setupMethod = null;
$teardownMethod = null;

foreach ($methods as $method) {
$attributes = $method->getAttributes(Subscribe::class);
Expand All @@ -54,39 +54,39 @@ public function metadata(string $subscriber): SubscriberMetadata
}

if ($method->getAttributes(Setup::class)) {
if ($createMethod !== null) {
if ($setupMethod !== null) {
throw new DuplicateSetupMethod(
$subscriber,
$createMethod,
$setupMethod,
$method->getName(),
);
}

$createMethod = $method->getName();
$setupMethod = $method->getName();
}

if (!$method->getAttributes(Teardown::class)) {
continue;
}

if ($dropMethod !== null) {
if ($teardownMethod !== null) {
throw new DuplicateTeardownMethod(
$subscriber,
$dropMethod,
$teardownMethod,
$method->getName(),
);
}

$dropMethod = $method->getName();
$teardownMethod = $method->getName();
}

$metadata = new SubscriberMetadata(
$subscriberInfo->id,
$subscriberInfo->group,
$subscriberInfo->runMode,
$subscribeMethods,
$createMethod,
$dropMethod,
$setupMethod,
$teardownMethod,
);

$this->subscriberMetadata[$subscriber] = $metadata;
Expand Down
Loading

0 comments on commit 9c96e5a

Please sign in to comment.