diff --git a/src/WorkerBundle/Commands/DispatchingConsumerCommand.php b/src/WorkerBundle/Commands/DispatchingConsumerCommand.php index 1cf7d41..bf35935 100644 --- a/src/WorkerBundle/Commands/DispatchingConsumerCommand.php +++ b/src/WorkerBundle/Commands/DispatchingConsumerCommand.php @@ -12,8 +12,10 @@ namespace Alchemy\WorkerBundle\Commands; use Alchemy\Worker\MessageDispatcher; +use Alchemy\Worker\WorkerInvoker; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; class DispatchingConsumerCommand extends Command @@ -23,21 +25,29 @@ class DispatchingConsumerCommand extends Command */ private $messageDispatcher; + /** + * @var WorkerInvoker + */ + private $workerInvoker; + /** * @param MessageDispatcher $messageDispatcher + * @param WorkerInvoker $workerInvoker */ - public function __construct(MessageDispatcher $messageDispatcher) + public function __construct(MessageDispatcher $messageDispatcher, WorkerInvoker $workerInvoker) { parent::__construct(); $this->messageDispatcher = $messageDispatcher; + $this->workerInvoker = $workerInvoker; } protected function configure() { parent::configure(); - $this->setName('workers:run-dispatcher'); + $this->setName('workers:run-dispatcher') + ->addOption('preserve-payload', 'p', InputOption::VALUE_NONE); } /** @@ -47,6 +57,10 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { + if ($input->getOption('preserve-payload')) { + $this->workerInvoker->preservePayloads(); + } + while (true) { $this->messageDispatcher->run(); } diff --git a/src/WorkerBundle/Commands/InvokeWorkerCommand.php b/src/WorkerBundle/Commands/InvokeWorkerCommand.php index a4870c0..3dac02c 100644 --- a/src/WorkerBundle/Commands/InvokeWorkerCommand.php +++ b/src/WorkerBundle/Commands/InvokeWorkerCommand.php @@ -14,6 +14,7 @@ use Alchemy\Worker\WorkerResolver; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; class InvokeWorkerCommand extends Command @@ -40,7 +41,8 @@ protected function configure() $this->setName('workers:run-worker') ->addArgument('type') - ->addArgument('body'); + ->addArgument('body') + ->addOption('preserve-payload', 'p', InputOption::VALUE_NONE); } protected function execute(InputInterface $input, OutputInterface $output) @@ -66,6 +68,8 @@ protected function execute(InputInterface $input, OutputInterface $output) $worker->process($body); - unlink($input->getArgument('body')); + if (! $input->getOption('preserve-payload')) { + unlink($input->getArgument('body')); + } } } diff --git a/src/WorkerBundle/Resources/config/services.yml b/src/WorkerBundle/Resources/config/services.yml index a2e3534..ada6ade 100644 --- a/src/WorkerBundle/Resources/config/services.yml +++ b/src/WorkerBundle/Resources/config/services.yml @@ -42,6 +42,7 @@ services: class: Alchemy\WorkerBundle\Commands\DispatchingConsumerCommand arguments: - "@alchemy_worker.message_dispatcher" + - "@alchemy_worker.worker_invoker" tags: - { name: console.command }