Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spiral/queue] Adding RetryPolicyInterceptor #980

Merged
merged 5 commits into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# CHANGELOG

## 3.8.4 - Unreleased
## Unreleased

- **Other Features**
- [spiral/queue] Added `Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor` to enable automatic job retries
with a configurable retry policy.

## 3.8.4 - 2023-09-08

- **Bug Fixes**
- [spiral/storage] Fixed `visibility` in the Storage configuration
Expand Down
46 changes: 46 additions & 0 deletions src/Queue/src/Attribute/RetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue\Attribute;

use Doctrine\Common\Annotations\Annotation\Attribute;
use Doctrine\Common\Annotations\Annotation\Attributes;
use Doctrine\Common\Annotations\Annotation\Target;
use Spiral\Attributes\NamedArgumentConstructor;
use Spiral\Queue\RetryPolicyInterface;
use Spiral\Queue\RetryPolicy as Policy;

/**
* @Annotation
* @NamedArgumentConstructor
* @Target({"CLASS"})
* @Attributes({
* @Attribute("maxAttempts", type="int"),
* @Attribute("delay", type="int"),
* @Attribute("multiplier", type="float"),
* })
*/
#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
class RetryPolicy
{
/**
* @param 0|positive-int $maxAttempts
* @param positive-int $delay in seconds.
*/
public function __construct(
protected readonly int $maxAttempts = 3,
protected readonly int $delay = 1,
protected readonly float $multiplier = 1
) {
}

public function getRetryPolicy(): RetryPolicyInterface
{
return new Policy(
maxAttempts: $this->maxAttempts,
delay: $this->delay,
multiplier: $this->multiplier
);
}
}
3 changes: 2 additions & 1 deletion src/Queue/src/Bootloader/QueueBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
use Spiral\Queue\Driver\{NullDriver, SyncDriver};
use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler};
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler};
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor};
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
use Spiral\Telemetry\TracerFactoryInterface;

Expand Down Expand Up @@ -164,6 +164,7 @@ private function initQueueConfig(EnvironmentInterface $env): void
'interceptors' => [
'consume' => [
ErrorHandlerInterceptor::class,
RetryPolicyInterceptor::class,
],
'push' => [],
],
Expand Down
14 changes: 14 additions & 0 deletions src/Queue/src/Exception/RetryableExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue\Exception;

use Spiral\Queue\RetryPolicyInterface;

interface RetryableExceptionInterface
{
public function isRetryable(): bool;

public function getRetryPolicy(): ?RetryPolicyInterface;
}
62 changes: 62 additions & 0 deletions src/Queue/src/Interceptor/Consume/RetryPolicyInterceptor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue\Interceptor\Consume;

use Spiral\Attributes\ReaderInterface;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterface;
use Spiral\Queue\Attribute\RetryPolicy as Attribute;
use Spiral\Queue\Exception\JobException;
use Spiral\Queue\Exception\RetryableExceptionInterface;
use Spiral\Queue\Exception\RetryException;
use Spiral\Queue\Options;
use Spiral\Queue\RetryPolicy;

final class RetryPolicyInterceptor implements CoreInterceptorInterface
{
public function __construct(
private readonly ReaderInterface $reader
) {
}

public function process(string $controller, string $action, array $parameters, CoreInterface $core): mixed
{
try {
return $core->callAction($controller, $action, $parameters);
} catch (\Throwable $e) {
$attribute = $this->reader->firstClassMetadata(new \ReflectionClass($controller), Attribute::class);
if ($attribute === null) {
throw $e;
}

$policy = $this->getRetryPolicy($e, $attribute);

$headers = $parameters['headers'] ?? [];
$attempts = (int)($headers['attempts'][0] ?? 0);

if ($policy->isRetryable($e, $attempts) === false) {
throw $e;
}

throw new RetryException(
reason: $e->getMessage(),
options: (new Options())
->withDelay($policy->getDelay($attempts))
->withHeader('attempts', (string)($attempts + 1))
);
}
}

private function getRetryPolicy(\Throwable $exception, Attribute $attribute): RetryPolicy
{
if ($exception instanceof JobException && $exception->getPrevious() !== null) {
$exception = $exception->getPrevious();
}

$policy = $exception instanceof RetryableExceptionInterface ? $exception->getRetryPolicy() : null;

return $policy ?? $attribute->getRetryPolicy();
}
}
77 changes: 77 additions & 0 deletions src/Queue/src/RetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue;

use Spiral\Queue\Exception\InvalidArgumentException;
use Spiral\Queue\Exception\JobException;
use Spiral\Queue\Exception\RetryableExceptionInterface;

final class RetryPolicy implements RetryPolicyInterface
{
/**
* @var positive-int|0
*/
private int $maxAttempts;

/**
* @var positive-int|0
*/
private int $delay;

private float $multiplier;

/**
* @throws InvalidArgumentException
*/
public function __construct(int $maxAttempts, int $delay, float $multiplier = 1)
{
if ($maxAttempts < 0) {
throw new InvalidArgumentException(
\sprintf('Maximum attempts must be greater than or equal to zero: `%s` given.', $maxAttempts)
);
}
$this->maxAttempts = $maxAttempts;

if ($delay < 0) {
throw new InvalidArgumentException(
\sprintf('Delay must be greater than or equal to zero: `%s` given.', $delay)
);
}
$this->delay = $delay;

if ($multiplier < 1) {
throw new InvalidArgumentException(
\sprintf('Multiplier must be greater than zero: `%s` given.', $multiplier)
);
}
$this->multiplier = $multiplier;
}

/**
* @param positive-int|0 $attempts
*
* @return positive-int
*/
public function getDelay(int $attempts = 0): int
{
return (int) \ceil($this->delay * $this->multiplier ** $attempts);
}

/**
* @param positive-int|0 $attempts
*/
public function isRetryable(\Throwable $exception, int $attempts = 0): bool
{
if ($exception instanceof JobException && $exception->getPrevious() !== null) {
$exception = $exception->getPrevious();
}

if (!$exception instanceof RetryableExceptionInterface || $this->maxAttempts === 0) {
return false;
}

return $exception->isRetryable() && $attempts < $this->maxAttempts;
}
}
20 changes: 20 additions & 0 deletions src/Queue/src/RetryPolicyInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue;

interface RetryPolicyInterface
{
/**
* @param positive-int|0 $attempts
*/
public function isRetryable(\Throwable $exception, int $attempts = 0): bool;

/**
* @param positive-int|0 $attempts
*
* @return positive-int
*/
public function getDelay(int $attempts = 0): int;
}
40 changes: 40 additions & 0 deletions src/Queue/tests/Attribute/RetryPolicyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute;

use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\TestCase;
use Spiral\Attributes\Factory;
use Spiral\Queue\Attribute\RetryPolicy;
use Spiral\Tests\Queue\Attribute\Stub\ExtendedRetryPolicy;
use Spiral\Tests\Queue\Attribute\Stub\WithDefaultRetryPolicyAnnotation;
use Spiral\Tests\Queue\Attribute\Stub\WithDefaultRetryPolicyAttribute;
use Spiral\Tests\Queue\Attribute\Stub\WithExtendedRetryPolicyAnnotation;
use Spiral\Tests\Queue\Attribute\Stub\WithExtendedRetryPolicyAttribute;
use Spiral\Tests\Queue\Attribute\Stub\WithoutRetryPolicy;
use Spiral\Tests\Queue\Attribute\Stub\WithRetryPolicyAnnotation;
use Spiral\Tests\Queue\Attribute\Stub\WithRetryPolicyAttribute;

final class RetryPolicyTest extends TestCase
{
#[DataProvider('classesProvider')]
public function testRetryPolicy(string $class, ?RetryPolicy $expected): void
{
$reader = (new Factory())->create();

$this->assertEquals($expected, $reader->firstClassMetadata(new \ReflectionClass($class), RetryPolicy::class));
}

public static function classesProvider(): \Traversable
{
yield [WithoutRetryPolicy::class, null];
yield [WithDefaultRetryPolicyAnnotation::class, new RetryPolicy()];
yield [WithDefaultRetryPolicyAttribute::class, new RetryPolicy()];
yield [WithRetryPolicyAnnotation::class, new RetryPolicy(5, 3_000, 2.5)];
yield [WithRetryPolicyAttribute::class, new RetryPolicy(5, 3_000, 2.5)];
yield [WithExtendedRetryPolicyAttribute::class, new ExtendedRetryPolicy()];
yield [WithExtendedRetryPolicyAnnotation::class, new ExtendedRetryPolicy()];
}
}
23 changes: 23 additions & 0 deletions src/Queue/tests/Attribute/Stub/ExtendedRetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Doctrine\Common\Annotations\Annotation\Target;
use Spiral\Attributes\NamedArgumentConstructor;
use Spiral\Queue\Attribute\RetryPolicy;

/**
* @Annotation
* @NamedArgumentConstructor
* @Target({"CLASS"})
*/
#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
final class ExtendedRetryPolicy extends RetryPolicy
{
public function __construct()
{
parent::__construct(maxAttempts: 10, delay: 12, multiplier: 5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

/**
* @RetryPolicy
*/
final class WithDefaultRetryPolicyAnnotation
{
}
12 changes: 12 additions & 0 deletions src/Queue/tests/Attribute/Stub/WithDefaultRetryPolicyAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

#[RetryPolicy]
final class WithDefaultRetryPolicyAttribute
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

/**
* @ExtendedRetryPolicy
*/
final class WithExtendedRetryPolicyAnnotation
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

#[ExtendedRetryPolicy]
final class WithExtendedRetryPolicyAttribute
{
}
14 changes: 14 additions & 0 deletions src/Queue/tests/Attribute/Stub/WithRetryPolicyAnnotation.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

/**
* @RetryPolicy(maxAttempts=5, delay=3000, multiplier=2.5)
*/
final class WithRetryPolicyAnnotation
{
}
12 changes: 12 additions & 0 deletions src/Queue/tests/Attribute/Stub/WithRetryPolicyAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

#[RetryPolicy(maxAttempts: 5, delay: 3_000, multiplier: 2.5)]
final class WithRetryPolicyAttribute
{
}
Loading
Loading