diff --git a/CHANGELOG.md b/CHANGELOG.md index 24f6af2cc..c49001678 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ # CHANGELOG -## 3.8.4 - Unreleased +## Unreleased + +- **Other Features** + - [spiral/queue] Added `Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor` to enable automatic job retries + with a configurable retry policy. + +## 3.8.4 - 2023-09-08 - **Bug Fixes** - [spiral/storage] Fixed `visibility` in the Storage configuration diff --git a/src/Queue/src/Attribute/RetryPolicy.php b/src/Queue/src/Attribute/RetryPolicy.php new file mode 100644 index 000000000..978e37f22 --- /dev/null +++ b/src/Queue/src/Attribute/RetryPolicy.php @@ -0,0 +1,46 @@ +maxAttempts, + delay: $this->delay, + multiplier: $this->multiplier + ); + } +} diff --git a/src/Queue/src/Bootloader/QueueBootloader.php b/src/Queue/src/Bootloader/QueueBootloader.php index d71e2dee6..9974cb24c 100644 --- a/src/Queue/src/Bootloader/QueueBootloader.php +++ b/src/Queue/src/Bootloader/QueueBootloader.php @@ -25,7 +25,7 @@ use Spiral\Queue\Driver\{NullDriver, SyncDriver}; use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler}; use Spiral\Queue\HandlerRegistryInterface; -use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler}; +use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor}; use Spiral\Telemetry\Bootloader\TelemetryBootloader; use Spiral\Telemetry\TracerFactoryInterface; @@ -164,6 +164,7 @@ private function initQueueConfig(EnvironmentInterface $env): void 'interceptors' => [ 'consume' => [ ErrorHandlerInterceptor::class, + RetryPolicyInterceptor::class, ], 'push' => [], ], diff --git a/src/Queue/src/Exception/RetryableExceptionInterface.php b/src/Queue/src/Exception/RetryableExceptionInterface.php new file mode 100644 index 000000000..445cec3cb --- /dev/null +++ b/src/Queue/src/Exception/RetryableExceptionInterface.php @@ -0,0 +1,14 @@ +callAction($controller, $action, $parameters); + } catch (\Throwable $e) { + $attribute = $this->reader->firstClassMetadata(new \ReflectionClass($controller), Attribute::class); + if ($attribute === null) { + throw $e; + } + + $policy = $this->getRetryPolicy($e, $attribute); + + $headers = $parameters['headers'] ?? []; + $attempts = (int)($headers['attempts'][0] ?? 0); + + if ($policy->isRetryable($e, $attempts) === false) { + throw $e; + } + + throw new RetryException( + reason: $e->getMessage(), + options: (new Options()) + ->withDelay($policy->getDelay($attempts)) + ->withHeader('attempts', (string)($attempts + 1)) + ); + } + } + + private function getRetryPolicy(\Throwable $exception, Attribute $attribute): RetryPolicy + { + if ($exception instanceof JobException && $exception->getPrevious() !== null) { + $exception = $exception->getPrevious(); + } + + $policy = $exception instanceof RetryableExceptionInterface ? $exception->getRetryPolicy() : null; + + return $policy ?? $attribute->getRetryPolicy(); + } +} diff --git a/src/Queue/src/RetryPolicy.php b/src/Queue/src/RetryPolicy.php new file mode 100644 index 000000000..5c35f25ef --- /dev/null +++ b/src/Queue/src/RetryPolicy.php @@ -0,0 +1,77 @@ +maxAttempts = $maxAttempts; + + if ($delay < 0) { + throw new InvalidArgumentException( + \sprintf('Delay must be greater than or equal to zero: `%s` given.', $delay) + ); + } + $this->delay = $delay; + + if ($multiplier < 1) { + throw new InvalidArgumentException( + \sprintf('Multiplier must be greater than zero: `%s` given.', $multiplier) + ); + } + $this->multiplier = $multiplier; + } + + /** + * @param positive-int|0 $attempts + * + * @return positive-int + */ + public function getDelay(int $attempts = 0): int + { + return (int) \ceil($this->delay * $this->multiplier ** $attempts); + } + + /** + * @param positive-int|0 $attempts + */ + public function isRetryable(\Throwable $exception, int $attempts = 0): bool + { + if ($exception instanceof JobException && $exception->getPrevious() !== null) { + $exception = $exception->getPrevious(); + } + + if (!$exception instanceof RetryableExceptionInterface || $this->maxAttempts === 0) { + return false; + } + + return $exception->isRetryable() && $attempts < $this->maxAttempts; + } +} diff --git a/src/Queue/src/RetryPolicyInterface.php b/src/Queue/src/RetryPolicyInterface.php new file mode 100644 index 000000000..e45a7850f --- /dev/null +++ b/src/Queue/src/RetryPolicyInterface.php @@ -0,0 +1,20 @@ +create(); + + $this->assertEquals($expected, $reader->firstClassMetadata(new \ReflectionClass($class), RetryPolicy::class)); + } + + public static function classesProvider(): \Traversable + { + yield [WithoutRetryPolicy::class, null]; + yield [WithDefaultRetryPolicyAnnotation::class, new RetryPolicy()]; + yield [WithDefaultRetryPolicyAttribute::class, new RetryPolicy()]; + yield [WithRetryPolicyAnnotation::class, new RetryPolicy(5, 3_000, 2.5)]; + yield [WithRetryPolicyAttribute::class, new RetryPolicy(5, 3_000, 2.5)]; + yield [WithExtendedRetryPolicyAttribute::class, new ExtendedRetryPolicy()]; + yield [WithExtendedRetryPolicyAnnotation::class, new ExtendedRetryPolicy()]; + } +} diff --git a/src/Queue/tests/Attribute/Stub/ExtendedRetryPolicy.php b/src/Queue/tests/Attribute/Stub/ExtendedRetryPolicy.php new file mode 100644 index 000000000..93c4dd8ac --- /dev/null +++ b/src/Queue/tests/Attribute/Stub/ExtendedRetryPolicy.php @@ -0,0 +1,23 @@ +retryable; + } + + public function getRetryPolicy(): ?RetryPolicyInterface + { + return $this->retryPolicy; + } +} diff --git a/src/Queue/tests/Interceptor/Consume/RetryPolicyInterceptorTest.php b/src/Queue/tests/Interceptor/Consume/RetryPolicyInterceptorTest.php new file mode 100644 index 000000000..1d4a7c934 --- /dev/null +++ b/src/Queue/tests/Interceptor/Consume/RetryPolicyInterceptorTest.php @@ -0,0 +1,170 @@ +reader = $this->createMock(ReaderInterface::class); + $this->core = $this->createMock(CoreInterface::class); + $this->interceptor = new RetryPolicyInterceptor($this->reader); + } + + public function testWithoutException(): void + { + $this->reader->expects($this->never())->method('firstClassMetadata'); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with('foo', 'bar', []) + ->willReturn('result'); + + $this->assertSame('result', $this->interceptor->process('foo', 'bar', [], $this->core)); + } + + public function testWithoutRetryPolicy(): void + { + $this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(null); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with(self::class, 'bar', []) + ->willThrowException(new \Exception('Something went wrong')); + + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Something went wrong'); + $this->interceptor->process(self::class, 'bar', [], $this->core); + } + + public function testNotRetryableException(): void + { + $this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(new RetryPolicy()); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with(self::class, 'bar', []) + ->willThrowException(new \Exception('Something went wrong')); + + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Something went wrong'); + $this->interceptor->process(self::class, 'bar', [], $this->core); + } + + public function testWithDefaultRetryPolicy(): void + { + $this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(new RetryPolicy()); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with(self::class, 'bar', []) + ->willThrowException(new TestRetryException()); + + try { + $this->interceptor->process(self::class, 'bar', [], $this->core); + } catch (RetryException $e) { + $this->assertSame(1, $e->getOptions()->getDelay()); + $this->assertSame(['attempts' => ['1']], $e->getOptions()->getHeaders()); + } + } + + public function testWithRetryPolicyInAttribute(): void + { + $this->reader->expects($this->once())->method('firstClassMetadata')->willReturn( + new RetryPolicy(maxAttempts: 3, delay: 4, multiplier: 2) + ); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]]) + ->willThrowException(new TestRetryException()); + + try { + $this->interceptor->process( + self::class, + 'bar', + ['headers' => ['attempts' => ['1']]], + $this->core + ); + } catch (RetryException $e) { + $this->assertSame(8, $e->getOptions()->getDelay()); + $this->assertSame(['attempts' => ['2']], $e->getOptions()->getHeaders()); + } + } + + public function testWithRetryPolicyInException(): void + { + $this->reader->expects($this->once())->method('firstClassMetadata')->willReturn( + new RetryPolicy(maxAttempts: 30, delay: 400, multiplier: 25) + ); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]]) + ->willThrowException(new TestRetryException( + retryPolicy: new \Spiral\Queue\RetryPolicy(maxAttempts: 3, delay: 4, multiplier: 2) + )); + + try { + $this->interceptor->process( + self::class, + 'bar', + ['headers' => ['attempts' => ['1']]], + $this->core + ); + } catch (RetryException $e) { + $this->assertSame(8, $e->getOptions()->getDelay()); + $this->assertSame(['attempts' => ['2']], $e->getOptions()->getHeaders()); + } + } + + public function testWithRetryPolicyInExceptionInsideJobException(): void + { + $this->reader->expects($this->once())->method('firstClassMetadata')->willReturn( + new RetryPolicy(maxAttempts: 30, delay: 400, multiplier: 25) + ); + + $this->core + ->expects($this->once()) + ->method('callAction') + ->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]]) + ->willThrowException(new JobException( + previous: new TestRetryException( + retryPolicy: new \Spiral\Queue\RetryPolicy(maxAttempts: 3, delay: 4, multiplier: 2) + ) + )); + + try { + $this->interceptor->process( + self::class, + 'bar', + ['headers' => ['attempts' => ['1']]], + $this->core + ); + } catch (RetryException $e) { + $this->assertSame(8, $e->getOptions()->getDelay()); + $this->assertSame(['attempts' => ['2']], $e->getOptions()->getHeaders()); + } + } +} diff --git a/src/Queue/tests/RetryPolicyTest.php b/src/Queue/tests/RetryPolicyTest.php new file mode 100644 index 000000000..d651e9961 --- /dev/null +++ b/src/Queue/tests/RetryPolicyTest.php @@ -0,0 +1,74 @@ +expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Maximum attempts must be greater than or equal to zero: `-1` given.'); + new RetryPolicy(-1, 0); + } + + public function testInvalidDelay(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Delay must be greater than or equal to zero: `-1` given.'); + new RetryPolicy(1, -1); + } + + public function testInvalidMultiplier(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Multiplier must be greater than zero: `-1` given.'); + new RetryPolicy(1, 1, -1); + } + + #[DataProvider('retryableDataProvider')] + public function testIsRetryable(\Throwable $exception, int $attempts, bool $expected): void + { + $policy = new RetryPolicy(1, 0); + + $this->assertSame($expected, $policy->isRetryable($exception, $attempts)); + } + + public function testGetDelayWithoutMultiplier(): void + { + $policy = new RetryPolicy(4, 1_000); + + $this->assertSame(1_000, $policy->getDelay()); + $this->assertSame(1_000, $policy->getDelay(1)); + $this->assertSame(1_000, $policy->getDelay(2)); + $this->assertSame(1_000, $policy->getDelay(3)); + $this->assertSame(1_000, $policy->getDelay(4)); + } + + public function testGetDelayWithMultiplier(): void + { + $policy = new RetryPolicy(4, 1_000, 2); + + $this->assertSame(1_000, $policy->getDelay()); + $this->assertSame(2_000, $policy->getDelay(1)); + $this->assertSame(4_000, $policy->getDelay(2)); + $this->assertSame(8_000, $policy->getDelay(3)); + $this->assertSame(16_000, $policy->getDelay(4)); + } + + public static function retryableDataProvider(): \Traversable + { + yield [new \DomainException(), 0, false]; + yield [new \DomainException(), 1, false]; + yield [new TestRetryException(), 0, true]; + yield [new TestRetryException(), 1, false]; + yield [new TestRetryException(false), 0, false]; + yield [new TestRetryException(false), 1, false]; + } +} diff --git a/tests/Framework/Bootloader/Queue/QueueBootloaderTest.php b/tests/Framework/Bootloader/Queue/QueueBootloaderTest.php index 11c1c1dc5..299f5b27b 100644 --- a/tests/Framework/Bootloader/Queue/QueueBootloaderTest.php +++ b/tests/Framework/Bootloader/Queue/QueueBootloaderTest.php @@ -83,6 +83,7 @@ public function testConfig(): void 'interceptors' => [ 'consume' => [ \Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor::class, + \Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor::class, ], 'push' => [] ],