From cc30c4b257e1d517f353d6c65074aa5d8c908270 Mon Sep 17 00:00:00 2001 From: Gabo Date: Tue, 10 Dec 2024 14:11:17 +0100 Subject: [PATCH] Update version --- pyproject.toml | 2 +- src/queue_processor/QueueProcessor.py | 30 ++++++++++----------------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 300bed9..5da9d64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "queue-processor" -version = "2024.12.2.1" +version = "2024.12.10.1" description = "Manage queues on Uwazi services" license = { file = "LICENSE" } authors = [{ name = "HURIDOCS" }] diff --git a/src/queue_processor/QueueProcessor.py b/src/queue_processor/QueueProcessor.py index 980a892..7aa02f7 100644 --- a/src/queue_processor/QueueProcessor.py +++ b/src/queue_processor/QueueProcessor.py @@ -46,32 +46,28 @@ def create_queues(self): self.queue_processor_logger.info(f"Creating queue {queue_name}") self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute() - def start(self, process: callable, restart_condition: Callable = None): + def start(self, process: callable, hide_message_seconds: int = 0): self.queue_processor_logger.info("QueueProcessor running") while True: - restart = False for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names): try: self.create_queues() task_queue = self.get_queue(task_queue_name) - message = task_queue.receiveMessage().execute() - task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute() - message = utils.decode_message(message["message"]) - results = process(message) + message = task_queue.receiveMessage(vt=hide_message_seconds if hide_message_seconds else None).execute() + + if hide_message_seconds == 0: + task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute() + + results = process(utils.decode_message(message["message"])) if not results: continue - self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message( - results - ).execute() + if hide_message_seconds: + task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute() - try: - restart = restart_condition(message) - except: - pass - - break + results_queue = self.get_queue(results_queue_name) + results_queue.sendMessage(delay=self.delay_time_for_results).message(results).execute() except NoMessageInQueue: sleep(2) @@ -83,7 +79,3 @@ def start(self, process: callable, restart_condition: Callable = None): self.exists_queues = False self.queue_processor_logger.error(f"Error: {e}", exc_info=True) sleep(60) - - if restart: - sleep(self.delay_time_for_results + 5) - break