diff --git a/.github/workflows/production.yml b/.github/workflows/production.yml index 940fb0e..39d3f14 100644 --- a/.github/workflows/production.yml +++ b/.github/workflows/production.yml @@ -1,3 +1,5 @@ +--- + name: Production CI/CD Pipeline on: diff --git a/.github/workflows/start.staging.yml b/.github/workflows/start.staging.yml index 24e53da..a53de6e 100644 --- a/.github/workflows/start.staging.yml +++ b/.github/workflows/start.staging.yml @@ -1,3 +1,5 @@ +--- + name: Staging CI/CD Pipeline on: pull_request diff --git a/.gitignore b/.gitignore index 32ea46e..36af5d5 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ coverage.xml .hypothesis/ .pytest_cache/ cover/ +coverage/ # Translations *.mo diff --git a/.yamllint b/.yamllint new file mode 100644 index 0000000..dd74efc --- /dev/null +++ b/.yamllint @@ -0,0 +1,7 @@ +rules: + brackets: + forbid: false + min-spaces-inside: 1 + max-spaces-inside: 1 + min-spaces-inside-empty: -1 + max-spaces-inside-empty: -1 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index a41cfbe..00e5a00 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,4 +9,4 @@ RUN chmod +x docker-entrypoint.sh CMD ["./docker-entrypoint.sh"] FROM base AS prod -CMD ["python3", "-m", "celery", "-A", "celery_app.server", "worker", "-l", "INFO"] +CMD ["celery", "-A", "worker", "worker", "-l", "INFO"] diff --git a/docker-compose.example.yml b/docker-compose.example.yml index 0eccf2e..aa671f8 100644 --- a/docker-compose.example.yml +++ b/docker-compose.example.yml @@ -1,14 +1,13 @@ -version: "3.9" +--- services: server: build: context: . target: prod - dockerfile: Dockerfile + command: python3 server.py worker: build: context: . target: prod dockerfile: Dockerfile - command: python3 worker.py diff --git a/docker-compose.test.yml b/docker-compose.test.yml index e1a1fb7..9325472 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -1,4 +1,4 @@ -version: "3.9" +--- services: app: @@ -55,7 +55,7 @@ services: - NEO4J_PLUGINS=["apoc", "graph-data-science"] - NEO4J_dbms_security_procedures_unrestricted=apoc.*,gds.* healthcheck: - test: ["CMD", "wget", "http://localhost:7474"] + test: [ "CMD", "wget", "http://localhost:7474" ] interval: 1m30s timeout: 10s retries: 2 @@ -77,7 +77,7 @@ services: - POSTGRES_USER=root - POSTGRES_PASSWORD=pass healthcheck: - test: ["CMD-SHELL", "pg_isready"] + test: [ "CMD-SHELL", "pg_isready" ] interval: 10s timeout: 5s retries: 5 @@ -105,12 +105,17 @@ services: qdrant-healthcheck: restart: always image: curlimages/curl:latest - entrypoint: ["/bin/sh", "-c", "--", "while true; do sleep 30; done;"] + entrypoint: + [ + "/bin/sh", + "-c", + "--", + "while true; do sleep 30; done;" + ] depends_on: - qdrant healthcheck: - test: ["CMD", "curl", "-f", "http://qdrant:6333/readyz"] + test: [ "CMD", "curl", "-f", "http://qdrant:6333/readyz" ] interval: 10s timeout: 2s retries: 5 - diff --git a/worker.py b/server.py similarity index 97% rename from worker.py rename to server.py index 8224baf..35c9cc2 100644 --- a/worker.py +++ b/server.py @@ -3,7 +3,6 @@ from typing import Any import backoff -from celery_app.tasks import ask_question_auto_search from pika.exceptions import ConnectionClosedByBroker from tc_messageBroker import RabbitMQ from tc_messageBroker.rabbit_mq.event import Event @@ -13,6 +12,7 @@ from tc_messageBroker.rabbit_mq.queue import Queue from utils.credentials import load_rabbitmq_credentials from utils.fetch_community_id import fetch_community_id_by_guild_id +from worker.tasks import ask_question_auto_search def query_llm(recieved_data: dict[str, Any]): diff --git a/utils/traceloop.py b/utils/traceloop.py new file mode 100644 index 0000000..9704719 --- /dev/null +++ b/utils/traceloop.py @@ -0,0 +1,16 @@ +import logging +import os + +from dotenv import load_dotenv +from traceloop.sdk import Traceloop + +load_dotenv() + + +def init_tracing(): + otel_endpoint = os.getenv("TRACELOOP_BASE_URL") + if not otel_endpoint: + logging.error("TRACELOOP_BASE_URL is not set.") + return + Traceloop.init(app_name="hivemind-worker", api_endpoint=otel_endpoint) + logging.info("Traceloop initialized.") diff --git a/celery_app/__init__.py b/worker/__init__.py similarity index 100% rename from celery_app/__init__.py rename to worker/__init__.py diff --git a/celery_app/server.py b/worker/celery.py similarity index 60% rename from celery_app/server.py rename to worker/celery.py index 499aff0..bb67f4b 100644 --- a/celery_app/server.py +++ b/worker/celery.py @@ -7,5 +7,12 @@ host = rabbit_creds["host"] port = rabbit_creds["port"] -app = Celery("celery_app/tasks", broker=f"pyamqp://{user}:{password}@{host}:{port}//") -app.autodiscover_tasks(["celery_app"]) +app = Celery( + "tasks", + broker=f"pyamqp://{user}:{password}@{host}:{port}//", + include=["worker.tasks"], +) + + +if __name__ == "__main__": + app.start() diff --git a/celery_app/tasks.py b/worker/tasks.py similarity index 92% rename from celery_app/tasks.py rename to worker/tasks.py index 8667231..d949b96 100644 --- a/celery_app/tasks.py +++ b/worker/tasks.py @@ -1,13 +1,9 @@ import gc import json import logging -import os from typing import Any -from celery.signals import task_postrun -from celery_app.server import app -from celery_app.utils.fire_event import job_send -from dotenv import load_dotenv +from celery.signals import task_postrun, task_prerun from subquery import query_multiple_source from tc_messageBroker.rabbit_mq.event import Event from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import ( @@ -18,8 +14,10 @@ ) from tc_messageBroker.rabbit_mq.payload.payload import Payload from tc_messageBroker.rabbit_mq.queue import Queue -from traceloop.sdk import Traceloop from utils.data_source_selector import DataSourceSelector +from utils.traceloop import init_tracing +from worker.celery import app +from worker.utils.fire_event import job_send @app.task @@ -46,9 +44,6 @@ def ask_question_auto_search( - `date` - `content`: which is the `ChatInputCommandInteraction` as a dictionary """ - load_dotenv() - otel_endpoint = os.getenv("TRACELOOP_BASE_URL") - Traceloop.init(app_name="hivemind-server", api_endpoint=otel_endpoint) prefix = f"COMMUNITY_ID: {community_id} | " logging.info(f"{prefix}Processing question!") @@ -124,6 +119,12 @@ def ask_question_auto_search( ) +@task_prerun.connect +def task_prerun_handler(sender=None, **kwargs): + # Initialize Traceloop for LLM + init_tracing() + + @task_postrun.connect def task_postrun_handler(sender=None, **kwargs): # Trigger garbage collection after each task diff --git a/celery_app/utils/__init__.py b/worker/utils/__init__.py similarity index 100% rename from celery_app/utils/__init__.py rename to worker/utils/__init__.py diff --git a/celery_app/utils/fire_event.py b/worker/utils/fire_event.py similarity index 100% rename from celery_app/utils/fire_event.py rename to worker/utils/fire_event.py