Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement fastapi for http server #82

Merged
merged 21 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions docker-compose.example.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,50 @@
---

services:
server:
api:
build:
context: .
target: prod
command: python3 server.py
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
ports:
- 8000:8000
environment:
- RABBIT_USER=root
- RABBIT_PASSWORD=pass
- RABBIT_HOST=rabbitmq
- RABBIT_PORT=5672
- REDIS_PASSWORD=pass
- REDIS_HOST=redis
- REDIS_PORT=6379
volumes:
- ./:/project/
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
worker:
build:
context: .
target: prod
dockerfile: Dockerfile
environment:
- RABBIT_USER=root
- RABBIT_PASSWORD=pass
- RABBIT_HOST=rabbitmq
- RABBIT_PORT=5672
- REDIS_PASSWORD=pass
- REDIS_HOST=redis
- REDIS_PORT=6379
rabbitmq:
image: "rabbitmq:3-management-alpine"
environment:
- RABBITMQ_DEFAULT_USER=root
- RABBITMQ_DEFAULT_PASS=pass
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 2
start_period: 40s
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
ports:
- 5672:5672
redis:
image: bitnami/redis
environment:
- REDIS_PASSWORD=pass
8 changes: 8 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from fastapi import FastAPI
from routers.amqp import router as amqpRouter
from routers.http import router as httpRouter

app = FastAPI()

app.include_router(httpRouter)
app.include_router(amqpRouter)
8 changes: 6 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ python-dotenv==1.0.0
tc-hivemind-backend==1.2.2
llama-index-question-gen-guidance==0.1.2
llama-index-vector-stores-postgres==0.1.2
celery>=5.3.6, <6.0.0
celery[redis]>=5.3.6, <6.0.0
guidance==0.1.14
tc-messageBroker==1.6.6
tc-messageBroker==1.7.1
traceloop-sdk==0.14.1
backoff==2.2.1
fastapi[standard]==0.114.1
faststream==0.5.23
aio_pika==9.4.0
mongomock==4.2.0.post1
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
61 changes: 61 additions & 0 deletions routers/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import datetime

from faststream.rabbit import RabbitBroker
from faststream.rabbit.fastapi import Logger, RabbitRouter # type: ignore
from faststream.rabbit.schemas.queue import RabbitQueue
from pydantic import BaseModel
from schema import AMQPPayload, ResponseModel
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.credentials import load_rabbitmq_credentials
from utils.persist_payload import PersistPayload
from worker.tasks import query_data_sources

rabbitmq_creds = load_rabbitmq_credentials()

router = RabbitRouter(rabbitmq_creds["url"])
broker = RabbitBroker(url=rabbitmq_creds["url"])


class Payload(BaseModel):
event: str
date: datetime | str
content: AMQPPayload

amindadgar marked this conversation as resolved.
Show resolved Hide resolved

@router.subscriber(queue=RabbitQueue(name=Queue.HIVEMIND, durable=True))
async def ask(payload: Payload, logger: Logger):
if payload.event == Event.HIVEMIND.QUESTION_RECEIVED:
try:
question = payload.content.question.message
community_id = payload.content.communityId

logger.info(f"COMMUNITY_ID: {community_id} Received job")
response = query_data_sources(community_id=community_id, query=question)
logger.info(f"COMMUNITY_ID: {community_id} Job finished")

response_payload = AMQPPayload(
communityId=community_id,
route=payload.content.route,
question=payload.content.question,
response=ResponseModel(message=response),
metadata=payload.content.metadata,
)
# dumping the whole payload of question & answer to db
persister = PersistPayload()
persister.persist_amqp(response_payload)

result = Payload(
event=payload.content.route.destination.event,
date=str(datetime.now()),
content=response_payload.model_dump(),
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
)
await broker.publish(
message=result, queue=payload.content.route.destination.queue
)
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
logger.exception(f"Errors While processing job! {e}")
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
else:
logger.error(
f"No such `{payload.event}` event available for {Queue.HIVEMIND} queue!"
)
52 changes: 52 additions & 0 deletions routers/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from celery.result import AsyncResult
from fastapi import APIRouter
from pydantic import BaseModel
from schema import HTTPPayload, QuestionModel, ResponseModel
from utils.persist_payload import PersistPayload
from worker.tasks import ask_question_auto_search


class RequestPayload(BaseModel):
question: QuestionModel
communityId: str


router = APIRouter()


@router.post("/ask")
async def ask(payload: RequestPayload):
query = payload.question.message
community_id = payload.communityId
task = ask_question_auto_search.delay(
community_id=community_id,
query=query,
)
payload_http = HTTPPayload(
communityId=community_id,
question=payload.question,
taskId=task.id,
)
# persisting the payload
persister = PersistPayload()
persister.persist_http(payload_http)

return {"id": task.id}


@router.get("/status")
async def status(task_id: str):
task = AsyncResult(task_id)

# persisting the data updates in db
persister = PersistPayload()

http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)
persister.persist_http(http_payload, update=True)

return {"id": task.id, "status": task.status, "result": task.result}
1 change: 1 addition & 0 deletions schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .payload import AMQPPayload, HTTPPayload, QuestionModel, ResponseModel
35 changes: 35 additions & 0 deletions schema/payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pydantic import BaseModel


class DestinationModel(BaseModel):
queue: str
event: str


class RouteModel(BaseModel):
source: str
destination: DestinationModel | None


class QuestionModel(BaseModel):
message: str
filters: dict | None = None


class ResponseModel(BaseModel):
message: str


class AMQPPayload(BaseModel):
communityId: str
route: RouteModel
question: QuestionModel
response: ResponseModel | None = None
metadata: dict | None


class HTTPPayload(BaseModel):
communityId: str
question: QuestionModel
response: ResponseModel | None = None
taskId: str
70 changes: 0 additions & 70 deletions server.py

This file was deleted.

26 changes: 26 additions & 0 deletions test_rb_send_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from tc_messageBroker import RabbitMQ
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue

if __name__ == "__main__":
broker_url = "localhost"
port = 5672
username = "root"
password = "pass"

rabbit_mq = RabbitMQ(
broker_url=broker_url, port=port, username=username, password=password
)
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

rabbit_mq.connect(Queue.HIVEMIND, queue_durable=False)

content = {
"community_id": "****",
"question": "What is Hivemind?",
}

rabbit_mq.publish(
Queue.HIVEMIND,
event=Event.DISCORD_ANALYZER.RUN,
content=content,
)
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading