Skip to content

Commit

Permalink
Merge pull request #80 from TogetherCrew/fix-memory
Browse files Browse the repository at this point in the history
changes server and worker
  • Loading branch information
cyri113 authored Jun 29, 2024
2 parents 9e9e7ec + 50d95e8 commit d8038ed
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/production.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
---

name: Production CI/CD Pipeline

on:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/start.staging.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
---

name: Staging CI/CD Pipeline

on: pull_request
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
cover/
coverage/

# Translations
*.mo
Expand Down
7 changes: 7 additions & 0 deletions .yamllint
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
rules:
brackets:
forbid: false
min-spaces-inside: 1
max-spaces-inside: 1
min-spaces-inside-empty: -1
max-spaces-inside-empty: -1
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ RUN chmod +x docker-entrypoint.sh
CMD ["./docker-entrypoint.sh"]

FROM base AS prod
CMD ["python3", "-m", "celery", "-A", "celery_app.server", "worker", "-l", "INFO"]
CMD ["celery", "-A", "worker", "worker", "-l", "INFO"]
5 changes: 2 additions & 3 deletions docker-compose.example.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
version: "3.9"
---

services:
server:
build:
context: .
target: prod
dockerfile: Dockerfile
command: python3 server.py
worker:
build:
context: .
target: prod
dockerfile: Dockerfile
command: python3 worker.py
17 changes: 11 additions & 6 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "3.9"
---

services:
app:
Expand Down Expand Up @@ -55,7 +55,7 @@ services:
- NEO4J_PLUGINS=["apoc", "graph-data-science"]
- NEO4J_dbms_security_procedures_unrestricted=apoc.*,gds.*
healthcheck:
test: ["CMD", "wget", "http://localhost:7474"]
test: [ "CMD", "wget", "http://localhost:7474" ]
interval: 1m30s
timeout: 10s
retries: 2
Expand All @@ -77,7 +77,7 @@ services:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=pass
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
test: [ "CMD-SHELL", "pg_isready" ]
interval: 10s
timeout: 5s
retries: 5
Expand Down Expand Up @@ -105,12 +105,17 @@ services:
qdrant-healthcheck:
restart: always
image: curlimages/curl:latest
entrypoint: ["/bin/sh", "-c", "--", "while true; do sleep 30; done;"]
entrypoint:
[
"/bin/sh",
"-c",
"--",
"while true; do sleep 30; done;"
]
depends_on:
- qdrant
healthcheck:
test: ["CMD", "curl", "-f", "http://qdrant:6333/readyz"]
test: [ "CMD", "curl", "-f", "http://qdrant:6333/readyz" ]
interval: 10s
timeout: 2s
retries: 5

2 changes: 1 addition & 1 deletion worker.py → server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any

import backoff
from celery_app.tasks import ask_question_auto_search
from pika.exceptions import ConnectionClosedByBroker
from tc_messageBroker import RabbitMQ
from tc_messageBroker.rabbit_mq.event import Event
Expand All @@ -13,6 +12,7 @@
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.credentials import load_rabbitmq_credentials
from utils.fetch_community_id import fetch_community_id_by_guild_id
from worker.tasks import ask_question_auto_search


def query_llm(recieved_data: dict[str, Any]):
Expand Down
16 changes: 16 additions & 0 deletions utils/traceloop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import os

from dotenv import load_dotenv
from traceloop.sdk import Traceloop

load_dotenv()


def init_tracing():
otel_endpoint = os.getenv("TRACELOOP_BASE_URL")
if not otel_endpoint:
logging.error("TRACELOOP_BASE_URL is not set.")
return
Traceloop.init(app_name="hivemind-worker", api_endpoint=otel_endpoint)
logging.info("Traceloop initialized.")
File renamed without changes.
11 changes: 9 additions & 2 deletions celery_app/server.py → worker/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,12 @@
host = rabbit_creds["host"]
port = rabbit_creds["port"]

app = Celery("celery_app/tasks", broker=f"pyamqp://{user}:{password}@{host}:{port}//")
app.autodiscover_tasks(["celery_app"])
app = Celery(
"tasks",
broker=f"pyamqp://{user}:{password}@{host}:{port}//",
include=["worker.tasks"],
)


if __name__ == "__main__":
app.start()
19 changes: 10 additions & 9 deletions celery_app/tasks.py → worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import gc
import json
import logging
import os
from typing import Any

from celery.signals import task_postrun
from celery_app.server import app
from celery_app.utils.fire_event import job_send
from dotenv import load_dotenv
from celery.signals import task_postrun, task_prerun
from subquery import query_multiple_source
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import (
Expand All @@ -18,8 +14,10 @@
)
from tc_messageBroker.rabbit_mq.payload.payload import Payload
from tc_messageBroker.rabbit_mq.queue import Queue
from traceloop.sdk import Traceloop
from utils.data_source_selector import DataSourceSelector
from utils.traceloop import init_tracing
from worker.celery import app
from worker.utils.fire_event import job_send


@app.task
Expand All @@ -46,9 +44,6 @@ def ask_question_auto_search(
- `date`
- `content`: which is the `ChatInputCommandInteraction` as a dictionary
"""
load_dotenv()
otel_endpoint = os.getenv("TRACELOOP_BASE_URL")
Traceloop.init(app_name="hivemind-server", api_endpoint=otel_endpoint)

prefix = f"COMMUNITY_ID: {community_id} | "
logging.info(f"{prefix}Processing question!")
Expand Down Expand Up @@ -124,6 +119,12 @@ def ask_question_auto_search(
)


@task_prerun.connect
def task_prerun_handler(sender=None, **kwargs):
# Initialize Traceloop for LLM
init_tracing()


@task_postrun.connect
def task_postrun_handler(sender=None, **kwargs):
# Trigger garbage collection after each task
Expand Down
File renamed without changes.
File renamed without changes.

0 comments on commit d8038ed

Please sign in to comment.