diff --git a/databox/api/src/Integration/Phrasea/Expose/ExposeSynchronizer.php b/databox/api/src/Integration/Phrasea/Expose/ExposeSynchronizer.php index 19c912209..833ccb1c1 100644 --- a/databox/api/src/Integration/Phrasea/Expose/ExposeSynchronizer.php +++ b/databox/api/src/Integration/Phrasea/Expose/ExposeSynchronizer.php @@ -2,6 +2,7 @@ namespace App\Integration\Phrasea\Expose; +use Alchemy\CoreBundle\Lock\LockTrait; use App\Entity\Basket\Basket; use App\Entity\Integration\IntegrationData; use App\Integration\IntegrationManager; @@ -11,6 +12,7 @@ final class ExposeSynchronizer { use PusherTrait; + use LockTrait; public function __construct( private readonly ExposeClient $exposeClient, @@ -20,6 +22,16 @@ public function __construct( } public function synchronize(IntegrationData $basketData): void + { + $this->executeWithLock( + 'sync:'.$basketData->getId(), + 30, + 'synchronize', + fn() => $this->doSynchronize($basketData) + ); + } + + private function doSynchronize(IntegrationData $basketData): void { $config = $this->integrationManager->getIntegrationConfiguration($basketData->getIntegration()); $token = $this->integrationTokenRepository->getLastValidUserToken($config->getIntegrationId(), $basketData->getUserId()); diff --git a/databox/api/src/Integration/Phrasea/Expose/Message/SyncBasketHandler.php b/databox/api/src/Integration/Phrasea/Expose/Message/SyncBasketHandler.php index 83ecc494a..7fb655dc2 100644 --- a/databox/api/src/Integration/Phrasea/Expose/Message/SyncBasketHandler.php +++ b/databox/api/src/Integration/Phrasea/Expose/Message/SyncBasketHandler.php @@ -4,7 +4,10 @@ use App\Integration\IntegrationDataManager; use App\Integration\Phrasea\Expose\ExposeSynchronizer; +use Symfony\Component\HttpKernel\Exception\TooManyRequestsHttpException; use Symfony\Component\Messenger\Attribute\AsMessageHandler; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; #[AsMessageHandler] final readonly class SyncBasketHandler @@ -12,12 +15,21 @@ public function __construct( private IntegrationDataManager $integrationDataManager, private ExposeSynchronizer $exposeSynchronizer, + private MessageBusInterface $bus, ) { } public function __invoke(SyncBasket $message): void { $integrationData = $this->integrationDataManager->getByIdTrusted($message->getId()); - $this->exposeSynchronizer->synchronize($integrationData); + try { + $this->exposeSynchronizer->synchronize($integrationData); + } catch (TooManyRequestsHttpException $e) { + $delay = $e->getHeaders()['Retry-After'] ?? 300; + + $this->bus->dispatch($message, [ + new DelayStamp($delay * 1000), + ]); + } } } diff --git a/lib/php/core-bundle/Lock/LockTrait.php b/lib/php/core-bundle/Lock/LockTrait.php new file mode 100644 index 000000000..e438c6c41 --- /dev/null +++ b/lib/php/core-bundle/Lock/LockTrait.php @@ -0,0 +1,33 @@ +lockFactory = $lockFactory; + } + + public function executeWithLock(string $resource, int $retryAfter, string $message, callable $callback): mixed + { + $lock = $this->lockFactory->createLock($resource, ttl: $retryAfter); + + if (!$lock->acquire()) { + throw new TooManyRequestsHttpException($retryAfter, $message); + } + + try { + return $callback(); + } finally { + $lock->release(); + } + } +}