From aac6f34134e4395500730d2725793f3cc7391747 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Thu, 7 Nov 2024 13:52:26 +0330 Subject: [PATCH 1/2] fix: rabbitMQ connection was not initialized! --- routers/amqp.py | 7 ++++--- worker/utils/fire_event.py | 36 +++++++----------------------------- 2 files changed, 11 insertions(+), 32 deletions(-) diff --git a/routers/amqp.py b/routers/amqp.py index c8da60c..a738f03 100644 --- a/routers/amqp.py +++ b/routers/amqp.py @@ -10,11 +10,11 @@ from utils.credentials import load_rabbitmq_credentials from utils.persist_payload import PersistPayload from worker.tasks import query_data_sources +from worker.utils.fire_event import job_send rabbitmq_creds = load_rabbitmq_credentials() router = RabbitRouter(rabbitmq_creds["url"]) -broker = RabbitBroker(url=rabbitmq_creds["url"]) class Payload(BaseModel): @@ -50,8 +50,9 @@ async def ask(payload: Payload, logger: Logger): date=str(datetime.now()), content=response_payload.model_dump(), ) - await broker.publish( - message=result, queue=payload.content.route.destination.queue + job_send( + message=result, + queue_name=payload.content.route.destination.queue, ) except Exception as e: logger.exception(f"Errors While processing job! {e}") diff --git a/worker/utils/fire_event.py b/worker/utils/fire_event.py index 1cf7830..61ecece 100644 --- a/worker/utils/fire_event.py +++ b/worker/utils/fire_event.py @@ -1,43 +1,21 @@ -import logging from typing import Any +from faststream.rabbit import RabbitBroker -from tc_messageBroker import RabbitMQ from utils.credentials import load_rabbitmq_credentials -def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None: +async def job_send(message: Any, queue_name: str) -> None: """ fire the data to a specific event on a specific queue Parameters ----------- - event : str - the event to fire message to + message : Any + the message to be sent queue_name : str the queue to fire message on - content : dict[str, Any] - the content to send messages to """ - logging.info(f"IN JOB_SEND!, event: {event}") + rabbitmq_creds = load_rabbitmq_credentials() + broker = RabbitBroker(url=rabbitmq_creds["url"]) - rabbit_creds = load_rabbitmq_credentials() - username = rabbit_creds["user"] - password = rabbit_creds["password"] - broker_url = rabbit_creds["host"] - port = rabbit_creds["port"] - rabbit_mq = RabbitMQ( - broker_url=broker_url, port=port, username=username, password=password - ) - logging.info("Connecting to rabbitMQ!") - rabbit_mq.connect(queue_name) - logging.info("Trying to publish on rabbitMQ") - rabbit_mq.publish( - queue_name=queue_name, - event=event, - content=content, - ) - logging.info("Published to RabbitMQ!") - try: - rabbit_mq.connection.close() - except Exception as e: - logging.error(f"Failed to close RabbitMQ connection: {e}") + await broker.publish(message=message, queue=queue_name) From 50606d6a81987e58a12ff27757db4d03333feb72 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Thu, 7 Nov 2024 14:02:32 +0330 Subject: [PATCH 2/2] feat: using the shared lib to fire event! it was supporting the singleton logic. --- routers/amqp.py | 13 +++++++------ worker/utils/fire_event.py | 33 +++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/routers/amqp.py b/routers/amqp.py index a738f03..f4f45eb 100644 --- a/routers/amqp.py +++ b/routers/amqp.py @@ -45,14 +45,15 @@ async def ask(payload: Payload, logger: Logger): persister = PersistPayload() persister.persist_amqp(response_payload) - result = Payload( - event=payload.content.route.destination.event, - date=str(datetime.now()), - content=response_payload.model_dump(), - ) + # result = Payload( + # event=payload.content.route.destination.event, + # date=str(datetime.now()), + # content=response_payload.model_dump(), + # ) job_send( - message=result, + event=payload.content.route.destination.event, queue_name=payload.content.route.destination.queue, + content=response_payload.model_dump(), ) except Exception as e: logger.exception(f"Errors While processing job! {e}") diff --git a/worker/utils/fire_event.py b/worker/utils/fire_event.py index 61ecece..1ca4295 100644 --- a/worker/utils/fire_event.py +++ b/worker/utils/fire_event.py @@ -1,21 +1,38 @@ +import logging from typing import Any -from faststream.rabbit import RabbitBroker +from tc_messageBroker import RabbitMQ from utils.credentials import load_rabbitmq_credentials -async def job_send(message: Any, queue_name: str) -> None: +def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None: """ fire the data to a specific event on a specific queue Parameters ----------- - message : Any - the message to be sent + event : str + the event to fire message to queue_name : str the queue to fire message on + content : dict[str, Any] + the content to send messages to """ - rabbitmq_creds = load_rabbitmq_credentials() - broker = RabbitBroker(url=rabbitmq_creds["url"]) - - await broker.publish(message=message, queue=queue_name) + rabbit_creds = load_rabbitmq_credentials() + username = rabbit_creds["user"] + password = rabbit_creds["password"] + broker_url = rabbit_creds["host"] + port = rabbit_creds["port"] + rabbit_mq = RabbitMQ( + broker_url=broker_url, port=port, username=username, password=password + ) + rabbit_mq.connect(queue_name) + rabbit_mq.publish( + queue_name=queue_name, + event=event, + content=content, + ) + try: + rabbit_mq.connection.close() + except Exception as e: + logging.error(f"Failed to close RabbitMQ connection: {e}")