diff --git a/setup.py b/setup.py index 549420d..dbbf0e4 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ name=PROJECT_NAME, packages=["queue_processor"], package_dir={"": "src"}, - version="0.2", + version="0.3", url="https://github.com/huridocs/queue-processor", author="HURIDOCS", description="Manage queues on Uwazi services", diff --git a/src/queue_processor/QueueProcessor.py b/src/queue_processor/QueueProcessor.py index d247148..a4a5007 100644 --- a/src/queue_processor/QueueProcessor.py +++ b/src/queue_processor/QueueProcessor.py @@ -43,20 +43,16 @@ def start(self, process: callable): for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names): try: self.create_queues() - self.queue_processor_logger.info(f"Processing queue: {task_queue_name}") task_queue = self.get_queue(task_queue_name) message = task_queue.receiveMessage().execute() task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute() + results = process(utils.decode_message(message["message"])) - if "message" in message: - message["message"] = utils.decode_message(message["message"]) + if results: + self.get_queue(results_queue_name).sendMessage().message(results).execute() + break - results = process(message["message"]) - self.queue_processor_logger.info(f"Sending results to queue: {results_queue_name}") - self.get_queue(results_queue_name).sendMessage().message(results).execute() - break except NoMessageInQueue: - self.queue_processor_logger.info("No messages in queue") sleep(2) except redis.exceptions.ConnectionError: self.exists_queues = False diff --git a/src/service_mock.py b/src/service_mock.py index 22e64f9..f10cee4 100644 --- a/src/service_mock.py +++ b/src/service_mock.py @@ -1,7 +1,10 @@ from src.queue_processor.QueueProcessor import QueueProcessor -def process(message: dict[str, any]) -> dict[str, any]: +def process(message: dict[str, any]) -> dict[str, any] | None: + if "required_field" not in message: + return None + message["processed"] = True return message diff --git a/src/tests/test_queue_processor.py b/src/tests/test_queue_processor.py index 471917f..b9580dc 100644 --- a/src/tests/test_queue_processor.py +++ b/src/tests/test_queue_processor.py @@ -19,11 +19,13 @@ def test_two_queues(self): sleep(3) - queue_tasks_1.sendMessage().message({"test": "test_1"}).execute() - queue_tasks_1.sendMessage().message({"test": "test_2"}).execute() - queue_tasks_2.sendMessage().message({"test": "test_3"}).execute() + queue_tasks_1.sendMessage().message({"test": "test_0"}).execute() + queue_tasks_1.sendMessage().message({"required_field": True, "test": "test_1"}).execute() + queue_tasks_1.sendMessage().message({"required_field": True, "test": "test_2"}).execute() + queue_tasks_2.sendMessage().message({"test": "test_0"}).execute() + queue_tasks_2.sendMessage().message({"required_field": True, "test": "test_3"}).execute() - sleep(3) + sleep(5) result_message_1 = utils.decode_message(queue_results_1.receiveMessage().execute()["message"]) result_message_2 = utils.decode_message(queue_results_1.receiveMessage().execute()["message"])