Skip to content

Commit

Permalink
Merge pull request #980 from msmakouz/feature/retry
Browse files Browse the repository at this point in the history
[spiral/queue] Adding `RetryPolicyInterceptor`
  • Loading branch information
butschster authored Sep 9, 2023
2 parents 0c4ba5e + fc3c61c commit 7386638
Show file tree
Hide file tree
Showing 20 changed files with 646 additions and 2 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# CHANGELOG

## 3.8.4 - Unreleased
## Unreleased

- **Other Features**
- [spiral/queue] Added `Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor` to enable automatic job retries
with a configurable retry policy.

## 3.8.4 - 2023-09-08

- **Bug Fixes**
- [spiral/storage] Fixed `visibility` in the Storage configuration
Expand Down
46 changes: 46 additions & 0 deletions src/Queue/src/Attribute/RetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue\Attribute;

use Doctrine\Common\Annotations\Annotation\Attribute;
use Doctrine\Common\Annotations\Annotation\Attributes;
use Doctrine\Common\Annotations\Annotation\Target;
use Spiral\Attributes\NamedArgumentConstructor;
use Spiral\Queue\RetryPolicyInterface;
use Spiral\Queue\RetryPolicy as Policy;

/**
* @Annotation
* @NamedArgumentConstructor
* @Target({"CLASS"})
* @Attributes({
* @Attribute("maxAttempts", type="int"),
* @Attribute("delay", type="int"),
* @Attribute("multiplier", type="float"),
* })
*/
#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
class RetryPolicy
{
/**
* @param 0|positive-int $maxAttempts
* @param positive-int $delay in seconds.
*/
public function __construct(
protected readonly int $maxAttempts = 3,
protected readonly int $delay = 1,
protected readonly float $multiplier = 1
) {
}

public function getRetryPolicy(): RetryPolicyInterface
{
return new Policy(
maxAttempts: $this->maxAttempts,
delay: $this->delay,
multiplier: $this->multiplier
);
}
}
3 changes: 2 additions & 1 deletion src/Queue/src/Bootloader/QueueBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
use Spiral\Queue\Driver\{NullDriver, SyncDriver};
use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler};
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler};
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor};
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
use Spiral\Telemetry\TracerFactoryInterface;

Expand Down Expand Up @@ -164,6 +164,7 @@ private function initQueueConfig(EnvironmentInterface $env): void
'interceptors' => [
'consume' => [
ErrorHandlerInterceptor::class,
RetryPolicyInterceptor::class,
],
'push' => [],
],
Expand Down
14 changes: 14 additions & 0 deletions src/Queue/src/Exception/RetryableExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue\Exception;

use Spiral\Queue\RetryPolicyInterface;

interface RetryableExceptionInterface
{
public function isRetryable(): bool;

public function getRetryPolicy(): ?RetryPolicyInterface;
}
62 changes: 62 additions & 0 deletions src/Queue/src/Interceptor/Consume/RetryPolicyInterceptor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue\Interceptor\Consume;

use Spiral\Attributes\ReaderInterface;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterface;
use Spiral\Queue\Attribute\RetryPolicy as Attribute;
use Spiral\Queue\Exception\JobException;
use Spiral\Queue\Exception\RetryableExceptionInterface;
use Spiral\Queue\Exception\RetryException;
use Spiral\Queue\Options;
use Spiral\Queue\RetryPolicy;

final class RetryPolicyInterceptor implements CoreInterceptorInterface
{
public function __construct(
private readonly ReaderInterface $reader
) {
}

public function process(string $controller, string $action, array $parameters, CoreInterface $core): mixed
{
try {
return $core->callAction($controller, $action, $parameters);
} catch (\Throwable $e) {
$attribute = $this->reader->firstClassMetadata(new \ReflectionClass($controller), Attribute::class);
if ($attribute === null) {
throw $e;
}

$policy = $this->getRetryPolicy($e, $attribute);

$headers = $parameters['headers'] ?? [];
$attempts = (int)($headers['attempts'][0] ?? 0);

if ($policy->isRetryable($e, $attempts) === false) {
throw $e;
}

throw new RetryException(
reason: $e->getMessage(),
options: (new Options())
->withDelay($policy->getDelay($attempts))
->withHeader('attempts', (string)($attempts + 1))
);
}
}

private function getRetryPolicy(\Throwable $exception, Attribute $attribute): RetryPolicy
{
if ($exception instanceof JobException && $exception->getPrevious() !== null) {
$exception = $exception->getPrevious();
}

$policy = $exception instanceof RetryableExceptionInterface ? $exception->getRetryPolicy() : null;

return $policy ?? $attribute->getRetryPolicy();
}
}
77 changes: 77 additions & 0 deletions src/Queue/src/RetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue;

use Spiral\Queue\Exception\InvalidArgumentException;
use Spiral\Queue\Exception\JobException;
use Spiral\Queue\Exception\RetryableExceptionInterface;

final class RetryPolicy implements RetryPolicyInterface
{
/**
* @var positive-int|0
*/
private int $maxAttempts;

/**
* @var positive-int|0
*/
private int $delay;

private float $multiplier;

/**
* @throws InvalidArgumentException
*/
public function __construct(int $maxAttempts, int $delay, float $multiplier = 1)
{
if ($maxAttempts < 0) {
throw new InvalidArgumentException(
\sprintf('Maximum attempts must be greater than or equal to zero: `%s` given.', $maxAttempts)
);
}
$this->maxAttempts = $maxAttempts;

if ($delay < 0) {
throw new InvalidArgumentException(
\sprintf('Delay must be greater than or equal to zero: `%s` given.', $delay)
);
}
$this->delay = $delay;

if ($multiplier < 1) {
throw new InvalidArgumentException(
\sprintf('Multiplier must be greater than zero: `%s` given.', $multiplier)
);
}
$this->multiplier = $multiplier;
}

/**
* @param positive-int|0 $attempts
*
* @return positive-int
*/
public function getDelay(int $attempts = 0): int
{
return (int) \ceil($this->delay * $this->multiplier ** $attempts);
}

/**
* @param positive-int|0 $attempts
*/
public function isRetryable(\Throwable $exception, int $attempts = 0): bool
{
if ($exception instanceof JobException && $exception->getPrevious() !== null) {
$exception = $exception->getPrevious();
}

if (!$exception instanceof RetryableExceptionInterface || $this->maxAttempts === 0) {
return false;
}

return $exception->isRetryable() && $attempts < $this->maxAttempts;
}
}
20 changes: 20 additions & 0 deletions src/Queue/src/RetryPolicyInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Spiral\Queue;

interface RetryPolicyInterface
{
/**
* @param positive-int|0 $attempts
*/
public function isRetryable(\Throwable $exception, int $attempts = 0): bool;

/**
* @param positive-int|0 $attempts
*
* @return positive-int
*/
public function getDelay(int $attempts = 0): int;
}
40 changes: 40 additions & 0 deletions src/Queue/tests/Attribute/RetryPolicyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute;

use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\TestCase;
use Spiral\Attributes\Factory;
use Spiral\Queue\Attribute\RetryPolicy;
use Spiral\Tests\Queue\Attribute\Stub\ExtendedRetryPolicy;
use Spiral\Tests\Queue\Attribute\Stub\WithDefaultRetryPolicyAnnotation;
use Spiral\Tests\Queue\Attribute\Stub\WithDefaultRetryPolicyAttribute;
use Spiral\Tests\Queue\Attribute\Stub\WithExtendedRetryPolicyAnnotation;
use Spiral\Tests\Queue\Attribute\Stub\WithExtendedRetryPolicyAttribute;
use Spiral\Tests\Queue\Attribute\Stub\WithoutRetryPolicy;
use Spiral\Tests\Queue\Attribute\Stub\WithRetryPolicyAnnotation;
use Spiral\Tests\Queue\Attribute\Stub\WithRetryPolicyAttribute;

final class RetryPolicyTest extends TestCase
{
#[DataProvider('classesProvider')]
public function testRetryPolicy(string $class, ?RetryPolicy $expected): void
{
$reader = (new Factory())->create();

$this->assertEquals($expected, $reader->firstClassMetadata(new \ReflectionClass($class), RetryPolicy::class));
}

public static function classesProvider(): \Traversable
{
yield [WithoutRetryPolicy::class, null];
yield [WithDefaultRetryPolicyAnnotation::class, new RetryPolicy()];
yield [WithDefaultRetryPolicyAttribute::class, new RetryPolicy()];
yield [WithRetryPolicyAnnotation::class, new RetryPolicy(5, 3_000, 2.5)];
yield [WithRetryPolicyAttribute::class, new RetryPolicy(5, 3_000, 2.5)];
yield [WithExtendedRetryPolicyAttribute::class, new ExtendedRetryPolicy()];
yield [WithExtendedRetryPolicyAnnotation::class, new ExtendedRetryPolicy()];
}
}
23 changes: 23 additions & 0 deletions src/Queue/tests/Attribute/Stub/ExtendedRetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Doctrine\Common\Annotations\Annotation\Target;
use Spiral\Attributes\NamedArgumentConstructor;
use Spiral\Queue\Attribute\RetryPolicy;

/**
* @Annotation
* @NamedArgumentConstructor
* @Target({"CLASS"})
*/
#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
final class ExtendedRetryPolicy extends RetryPolicy
{
public function __construct()
{
parent::__construct(maxAttempts: 10, delay: 12, multiplier: 5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

/**
* @RetryPolicy
*/
final class WithDefaultRetryPolicyAnnotation
{
}
12 changes: 12 additions & 0 deletions src/Queue/tests/Attribute/Stub/WithDefaultRetryPolicyAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

#[RetryPolicy]
final class WithDefaultRetryPolicyAttribute
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

/**
* @ExtendedRetryPolicy
*/
final class WithExtendedRetryPolicyAnnotation
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

#[ExtendedRetryPolicy]
final class WithExtendedRetryPolicyAttribute
{
}
14 changes: 14 additions & 0 deletions src/Queue/tests/Attribute/Stub/WithRetryPolicyAnnotation.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

/**
* @RetryPolicy(maxAttempts=5, delay=3000, multiplier=2.5)
*/
final class WithRetryPolicyAnnotation
{
}
12 changes: 12 additions & 0 deletions src/Queue/tests/Attribute/Stub/WithRetryPolicyAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\Tests\Queue\Attribute\Stub;

use Spiral\Queue\Attribute\RetryPolicy;

#[RetryPolicy(maxAttempts: 5, delay: 3_000, multiplier: 2.5)]
final class WithRetryPolicyAttribute
{
}
Loading

0 comments on commit 7386638

Please sign in to comment.