Skip to content

Commit

Permalink
Merge branch 'main' into feat/discourse-platform-engine
Browse files Browse the repository at this point in the history
  • Loading branch information
amindadgar committed Jan 29, 2024
2 parents b3a4a7d + 05198ba commit 8f98b46
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ RUN chmod +x docker-entrypoint.sh
CMD ["./docker-entrypoint.sh"]

FROM base AS prod
CMD ["python3", "celery", "-A", "celery_app.server", "worker", "-l", "INFO"]
CMD ["python3", "-m", "celery", "-A", "celery_app.server", "worker", "-l", "INFO"]
79 changes: 47 additions & 32 deletions celery_app/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import logging
import os
from typing import Any

from celery_app.server import app
from celery_app.utils.fire_event import job_send
from dotenv import load_dotenv
from subquery import query_multiple_source
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import (
Expand All @@ -12,18 +14,16 @@
from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import (
ChatInputCommandInteraction,
)
from tc_messageBroker.rabbit_mq.payload.discord_bot.interaction_response import (
InteractionResponse,
)
from tc_messageBroker.rabbit_mq.payload.payload import Payload
from tc_messageBroker.rabbit_mq.queue import Queue
from traceloop.sdk import Traceloop


@app.task
def ask_question_auto_search(
question: str,
community_id: str,
bot_given_info: ChatInputCommandInteraction,
bot_given_info: dict[str, Any],
) -> None:
"""
this task is for the case that the user asks a question
Expand All @@ -38,48 +38,63 @@ def ask_question_auto_search(
the community that the question was asked in
bot_given_info : tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction.ChatInputCommandInteraction
the information data that needed to be sent back to the bot again.
This would be the `ChatInputCommandInteraction`.
This would be a dictionary representing the keys
- `event`
- `date`
- `content`: which is the `ChatInputCommandInteraction` as a dictionary
"""
load_dotenv()
otel_endpoint = os.getenv("TRACELOOP_BASE_URL")
Traceloop.init(app_name="hivemind-server", api_endpoint=otel_endpoint)

prefix = f"COMMUNITY_ID: {community_id} | "
logging.info(f"{prefix}Processing question!")
create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create(
interaction=bot_given_info.to_dict(),
data=InteractionResponse(
type=4,
data=InteractionCallbackData(
content="Processing your question ...", flags=64
),
),
).to_dict()

logging.info(f"{prefix}Sending process question to discord-bot!")
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE,
queue_name=Queue.DISCORD_BOT,
content=create_interaction_content,
)
interaction = json.loads(bot_given_info["content"]["interaction"])
chat_input_interaction = ChatInputCommandInteraction.from_dict(interaction)
# create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create(
# interaction=chat_input_interaction,
# data=InteractionResponse(
# type=4,
# data=InteractionCallbackData(
# content="Processing your question ...", flags=64
# ),
# ),
# ).to_dict()

# logging.info(f"{prefix}Sending process question to discord-bot!")
# job_send(
# event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE,
# queue_name=Queue.DISCORD_BOT,
# content=create_interaction_content,
# )
logging.info(f"{prefix}Querying the data sources!")
# for now we have just the discord platform
response, source_nodes = query_multiple_source(
response, _ = query_multiple_source(
query=question,
community_id=community_id,
discord=True,
)

source_nodes_dict: list[dict[str, Any]] = []
for node in source_nodes:
node_dict = dict(node)
node_dict.pop("relationships", None)
source_nodes_dict.append(node_dict)
# source_nodes_dict: list[dict[str, Any]] = []
# for node in source_nodes:
# node_dict = dict(node)
# node_dict.pop("relationships", None)
# source_nodes_dict.append(node_dict)

results = {
"response": response,
"source_nodes": source_nodes_dict,
}
# results = {
# "response": response,
# The source of answers is commented for now
# "source_nodes": source_nodes_dict,
# }
results = response

response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=bot_given_info.to_dict(),
data=InteractionCallbackData(content=json.dumps(results)),
interaction=chat_input_interaction,
data=InteractionCallbackData(
# content=json.dumps(results)
content=results
),
).to_dict()

logging.info(f"{prefix}Sending Edit response to discord-bot!")
Expand Down
5 changes: 5 additions & 0 deletions celery_app/utils/fire_event.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Any

from tc_messageBroker import RabbitMQ
Expand All @@ -17,6 +18,7 @@ def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None:
content : dict[str, Any]
the content to send messages to
"""
logging.info(f"IN JOB_SEND!, event: {event}")

rabbit_creds = load_rabbitmq_credentials()
username = rabbit_creds["user"]
Expand All @@ -26,9 +28,12 @@ def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None:
rabbit_mq = RabbitMQ(
broker_url=broker_url, port=port, username=username, password=password
)
logging.info("Connecting to rabbitMQ!")
rabbit_mq.connect(queue_name)
logging.info("Trying to publish on rabbitMQ")
rabbit_mq.publish(
queue_name=queue_name,
event=event,
content=content,
)
logging.info("Published to RabbitMQ!")
1 change: 1 addition & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- MONGODB_PORT=27017
- MONGODB_USER=root
- MONGODB_PASS=pass
- TRACELOOP_BASE_URL=some_url
- NEO4J_PROTOCOL=bolt
- NEO4J_HOST=neo4j
- NEO4J_PORT=7687
Expand Down
6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ neo4j>=5.14.1, <6.0.0
coverage>=7.3.3, <8.0.0
pytest>=7.4.3, <8.0.0
python-dotenv==1.0.0
tc-hivemind-backend==1.1.0
tc-hivemind-backend==1.1.2
celery>=5.3.6, <6.0.0
guidance
tc-messageBroker>=1.6.4, <2.0.0
tc-messageBroker==1.6.6
traceloop-sdk==0.9.4
backoff==2.2.1
16 changes: 14 additions & 2 deletions worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import logging
from typing import Any

import backoff
from celery_app.tasks import ask_question_auto_search
from pika.exceptions import ConnectionClosedByBroker
from tc_messageBroker import RabbitMQ
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import (
Expand All @@ -16,7 +19,9 @@ def query_llm(recieved_data: dict[str, Any]):
"""
query the llm using the received data
"""
recieved_input = ChatInputCommandInteraction.from_dict(recieved_data)
logging.info(f"RECIEVED DATA: {recieved_data}")
interaction = json.loads(recieved_data["content"]["interaction"])
recieved_input = ChatInputCommandInteraction.from_dict(interaction)
# For now we just have one user input
if len(recieved_input.options["_hoistedOptions"]) > 1:
logging.warning(
Expand All @@ -27,13 +32,20 @@ def query_llm(recieved_data: dict[str, Any]):
user_input = recieved_input.options["_hoistedOptions"][0]["value"]

community_id = fetch_community_id_by_guild_id(guild_id=recieved_input.guild_id)
logging.info(f"COMMUNITY_ID: {community_id} | Sending job to Celery!")
ask_question_auto_search.delay(
question=user_input,
community_id=community_id,
bot_given_info=recieved_data,
)


@backoff.on_exception(
wait_gen=backoff.expo,
exception=(ConnectionClosedByBroker, ConnectionError),
# waiting for 3 hours
max_time=60 * 60 * 3,
)
def job_recieve(broker_url, port, username, password):
rabbit_mq = RabbitMQ(
broker_url=broker_url, port=port, username=username, password=password
Expand All @@ -46,7 +58,7 @@ def job_recieve(broker_url, port, username, password):
if rabbit_mq.channel is not None:
rabbit_mq.channel.start_consuming()
else:
print("Connection to broker was not successful!")
logging.error("Connection to broker was not successful!")


if __name__ == "__main__":
Expand Down

0 comments on commit 8f98b46

Please sign in to comment.