diff --git a/.github/workflows/production.yml b/.github/workflows/production.yml index a1be27b..940fb0e 100644 --- a/.github/workflows/production.yml +++ b/.github/workflows/production.yml @@ -9,4 +9,4 @@ jobs: ci: uses: TogetherCrew/operations/.github/workflows/ci.yml@main secrets: - CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} \ No newline at end of file + CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} diff --git a/.github/workflows/start.staging.yml b/.github/workflows/start.staging.yml index 842e3bd..24e53da 100644 --- a/.github/workflows/start.staging.yml +++ b/.github/workflows/start.staging.yml @@ -6,4 +6,4 @@ jobs: ci: uses: TogetherCrew/operations/.github/workflows/ci.yml@main secrets: - CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} \ No newline at end of file + CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} diff --git a/.gitignore b/.gitignore index 3a5bd6b..0c9e5b4 100644 --- a/.gitignore +++ b/.gitignore @@ -160,4 +160,5 @@ cython_debug/ #.idea/ hivemind-bot-env/* -main.ipynb \ No newline at end of file +main.ipynb +.DS_Store diff --git a/Dockerfile b/Dockerfile index e2734a2..62fe006 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # It's recommended that we use `bullseye` for Python (alpine isn't suitable as it conflcts with numpy) -FROM python:3.11-bullseye AS base +FROM python:3.11-bullseye AS base WORKDIR /project COPY . . RUN pip3 install -r requirements.txt diff --git a/README.md b/README.md index 6c6b912..6be17a4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # hivemind-bot + This repository is made for TogetherCrew's LLM bot. diff --git a/bot/retrievers/summary_retriever_base.py b/bot/retrievers/summary_retriever_base.py index 0095a7f..6160c17 100644 --- a/bot/retrievers/summary_retriever_base.py +++ b/bot/retrievers/summary_retriever_base.py @@ -42,7 +42,7 @@ def get_similar_nodes( ) -> list[NodeWithScore]: """ get k similar nodes to the query. - Note: this funciton wold get the embedding + Note: this function wold get the embedding for the query to do the similarity search. Parameters diff --git a/bot/retrievers/utils/load_hyperparams.py b/bot/retrievers/utils/load_hyperparams.py index 98db6ce..a5dc9ea 100644 --- a/bot/retrievers/utils/load_hyperparams.py +++ b/bot/retrievers/utils/load_hyperparams.py @@ -14,7 +14,7 @@ def load_hyperparams() -> tuple[int, int, int]: to get the `k1` count similar nodes k2 : int the value for the secondary raw search - to get the `k2` count simliar nodes + to get the `k2` count similar nodes d : int the before and after day interval """ diff --git a/celery_app/job_send.py b/celery_app/job_send.py deleted file mode 100644 index c1f59ae..0000000 --- a/celery_app/job_send.py +++ /dev/null @@ -1,30 +0,0 @@ -from tc_messageBroker import RabbitMQ -from tc_messageBroker.rabbit_mq.event import Event -from tc_messageBroker.rabbit_mq.queue import Queue - - -def job_send(broker_url, port, username, password, res): - rabbit_mq = RabbitMQ( - broker_url=broker_url, port=port, username=username, password=password - ) - - content = { - "uuid": "d99a1490-fba6-11ed-b9a9-0d29e7612dp8", - "data": f"some results {res}", - } - - rabbit_mq.connect(Queue.DISCORD_ANALYZER) - rabbit_mq.publish( - queue_name=Queue.DISCORD_ANALYZER, - event=Event.DISCORD_BOT.FETCH, - content=content, - ) - - -if __name__ == "__main__": - # TODO: read from .env - broker_url = "localhost" - port = 5672 - username = "root" - password = "pass" - job_send(broker_url, port, username, password, "CALLED FROM __main__") diff --git a/celery_app/tasks.py b/celery_app/tasks.py index 5dd154e..4af7d9a 100644 --- a/celery_app/tasks.py +++ b/celery_app/tasks.py @@ -1,29 +1,90 @@ -from celery_app.job_send import job_send -from celery_app.server import app -from utils.credentials import load_rabbitmq_credentials +import json +import logging +from typing import Any -# TODO: Write tasks that match our requirements +from celery_app.server import app +from celery_app.utils.fire_event import job_send +from subquery import query_multiple_source +from tc_messageBroker.rabbit_mq.event import Event +from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import ( + InteractionCallbackData, +) +from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import ( + ChatInputCommandInteraction, +) +from tc_messageBroker.rabbit_mq.payload.discord_bot.interaction_response import ( + InteractionResponse, +) +from tc_messageBroker.rabbit_mq.payload.payload import Payload +from tc_messageBroker.rabbit_mq.queue import Queue @app.task -def add(x, y): - rabbit_creds = load_rabbitmq_credentials() - username = rabbit_creds["user"] - password = rabbit_creds["password"] - broker_url = rabbit_creds["host"] - port = rabbit_creds["port"] +def ask_question_auto_search( + question: str, + community_id: str, + bot_given_info: ChatInputCommandInteraction, +) -> None: + """ + this task is for the case that the user asks a question + it would first retrieve the search metadata from summaries + then perform a query on the filetred raw data to find answer - res = x + y - job_send(broker_url, port, username, password, res) + Parameters + ------------ + question : str + the user question + community_id : str + the community that the question was asked in + bot_given_info : tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction.ChatInputCommandInteraction + the information data that needed to be sent back to the bot again. + This would be the `ChatInputCommandInteraction`. + """ + prefix = f"COMMUNITY_ID: {community_id} | " + logging.info(f"{prefix}Processing question!") + create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create( + interaction=bot_given_info.to_dict(), + data=InteractionResponse( + type=4, + data=InteractionCallbackData( + content="Processing your question ...", flags=64 + ), + ), + ).to_dict() - return res + logging.info(f"{prefix}Sending process question to discord-bot!") + job_send( + event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE, + queue_name=Queue.DISCORD_BOT, + content=create_interaction_content, + ) + logging.info(f"{prefix}Querying the data sources!") + # for now we have just the discord platform + response, source_nodes = query_multiple_source( + query=question, + community_id=community_id, + discord=True, + ) + source_nodes_dict: list[dict[str, Any]] = [] + for node in source_nodes: + node_dict = dict(node) + node_dict.pop("relationships", None) + source_nodes_dict.append(node_dict) -@app.task -def mul(x, y): - return x * y + results = { + "response": response, + "source_nodes": source_nodes_dict, + } + response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit( + interaction=bot_given_info.to_dict(), + data=InteractionCallbackData(content=json.dumps(results)), + ).to_dict() -@app.task -def xsum(numbers): - return sum(numbers) + logging.info(f"{prefix}Sending Edit response to discord-bot!") + job_send( + event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT, + queue_name=Queue.DISCORD_BOT, + content=response_payload, + ) diff --git a/celery_app/utils/__init__.py b/celery_app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/celery_app/utils/fire_event.py b/celery_app/utils/fire_event.py new file mode 100644 index 0000000..f74bb96 --- /dev/null +++ b/celery_app/utils/fire_event.py @@ -0,0 +1,34 @@ +from typing import Any + +from tc_messageBroker import RabbitMQ +from utils.credentials import load_rabbitmq_credentials + + +def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None: + """ + fire the data to a specific event on a specific queue + + Parameters + ----------- + event : str + the event to fire message to + queue_name : str + the queue to fire message on + content : dict[str, Any] + the content to send messages to + """ + + rabbit_creds = load_rabbitmq_credentials() + username = rabbit_creds["user"] + password = rabbit_creds["password"] + broker_url = rabbit_creds["host"] + port = rabbit_creds["port"] + rabbit_mq = RabbitMQ( + broker_url=broker_url, port=port, username=username, password=password + ) + rabbit_mq.connect(queue_name) + rabbit_mq.publish( + queue_name=queue_name, + event=event, + content=content, + ) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 6375962..786b4d7 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -49,7 +49,7 @@ services: - NEO4J_PLUGINS=["apoc", "graph-data-science"] - NEO4J_dbms_security_procedures_unrestricted=apoc.*,gds.* healthcheck: - test: ["CMD" ,"wget", "http://localhost:7474"] + test: ["CMD", "wget", "http://localhost:7474"] interval: 1m30s timeout: 10s retries: 2 diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 5127573..fa3c438 100644 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -1,3 +1,3 @@ #!/usr/bin/env bash python3 -m coverage run --omit=tests/* -m pytest . -python3 -m coverage lcov -o coverage/lcov.info \ No newline at end of file +python3 -m coverage lcov -o coverage/lcov.info diff --git a/requirements.txt b/requirements.txt index 0a35833..ee5546f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ numpy -llama-index>=0.9.26, <1.0.0 +llama-index>=0.9.27, <1.0.0 pymongo python-dotenv pgvector @@ -19,3 +19,4 @@ python-dotenv==1.0.0 tc-hivemind-backend==1.1.0 celery>=5.3.6, <6.0.0 guidance +tc-messageBroker>=1.6.4, <2.0.0 \ No newline at end of file diff --git a/subquery.py b/subquery.py index 58dfd21..44a8a86 100644 --- a/subquery.py +++ b/subquery.py @@ -1,6 +1,6 @@ from guidance.models import OpenAIChat from llama_index import QueryBundle, ServiceContext -from llama_index.core import BaseQueryEngine +from llama_index.core.base_query_engine import BaseQueryEngine from llama_index.query_engine import SubQuestionQueryEngine from llama_index.question_gen.guidance_generator import GuidanceQuestionGenerator from llama_index.schema import NodeWithScore @@ -12,12 +12,12 @@ def query_multiple_source( query: str, community_id: str, - discord: bool, - discourse: bool, - gdrive: bool, - notion: bool, - telegram: bool, - github: bool, + discord: bool = False, + discourse: bool = False, + gdrive: bool = False, + notion: bool = False, + telegram: bool = False, + github: bool = False, ) -> tuple[str, list[NodeWithScore]]: """ query multiple platforms and get an answer from the multiple @@ -30,23 +30,29 @@ def query_multiple_source( the community id to get their data discord : bool if `True` then add the engine to the subquery_generator + default is set to False discourse : bool if `True` then add the engine to the subquery_generator + default is set to False gdrive : bool if `True` then add the engine to the subquery_generator + default is set to False notion : bool if `True` then add the engine to the subquery_generator + default is set to False telegram : bool if `True` then add the engine to the subquery_generator + default is set to False github : bool if `True` then add the engine to the subquery_generator + default is set to False Returns -------- response : str, the response to the user query from the LLM - using the engines of the given platforms (pltform equal to True) + using the engines of the given platforms (platform equal to True) source_nodes : list[NodeWithScore] the list of nodes that were source of answering """ diff --git a/tests/integration/test_fetch_community_id.py b/tests/integration/test_fetch_community_id.py new file mode 100644 index 0000000..5ee8f74 --- /dev/null +++ b/tests/integration/test_fetch_community_id.py @@ -0,0 +1,80 @@ +from datetime import datetime, timedelta +from unittest import TestCase + +from bson import ObjectId +from utils.fetch_community_id import fetch_community_id_by_guild_id +from utils.mongo import MongoSingleton + + +class TestFetchDiscordCommunityId(TestCase): + def add_platform(self): + client = MongoSingleton.get_instance().get_client() + + action = { + "INT_THR": 1, + "UW_DEG_THR": 1, + "PAUSED_T_THR": 1, + "CON_T_THR": 4, + "CON_O_THR": 3, + "EDGE_STR_THR": 5, + "UW_THR_DEG_THR": 5, + "VITAL_T_THR": 4, + "VITAL_O_THR": 3, + "STILL_T_THR": 2, + "STILL_O_THR": 2, + "DROP_H_THR": 2, + "DROP_I_THR": 1, + } + + client["Core"]["platforms"].insert_one( + { + "_id": ObjectId(self.platform_id), + "name": "discord", + "metadata": { + "id": self.guild_id, + "icon": "111111111111111111111111", + "name": "A guild", + "selectedChannels": [ + {"channelId": "1020707129214111827", "channelName": "general"} + ], + "window": {"period_size": 7, "step_size": 1}, + "action": action, + "period": datetime.now() - timedelta(days=30), + }, + "community": ObjectId(self.community_id), + "disconnectedAt": None, + "connectedAt": (datetime.now() - timedelta(days=40)), + "isInProgress": True, + "createdAt": datetime(2023, 11, 1), + "updatedAt": datetime(2023, 11, 1), + } + ) + + def delete_platform(self): + client = MongoSingleton.get_instance().get_client() + client["Core"]["platforms"].delete_one({"_id": ObjectId(self.platform_id)}) + + def test_get_guild_id(self): + self.platform_id = "515151515151515151515151" + self.guild_id = "1234" + self.community_id = "aabbccddeeff001122334455" + self.delete_platform() + self.add_platform() + + community_id = fetch_community_id_by_guild_id(guild_id=self.guild_id) + + self.assertEqual(community_id, self.community_id) + + def test_get_guild_id_no_data(self): + self.platform_id = "515151515151515151515151" + self.guild_id = "1234" + self.community_id = "aabbccddeeff001122334455" + + self.delete_platform() + self.add_platform() + + client = MongoSingleton.get_instance().get_client() + client["Core"]["platforms"].delete_one({"_id": ObjectId(self.platform_id)}) + + with self.assertRaises(ValueError): + _ = fetch_community_id_by_guild_id(guild_id=self.guild_id) diff --git a/tests/unit/test_prepare_discord_query_engine.py b/tests/unit/test_prepare_discord_query_engine.py index 72ad1a6..45cc348 100644 --- a/tests/unit/test_prepare_discord_query_engine.py +++ b/tests/unit/test_prepare_discord_query_engine.py @@ -1,7 +1,7 @@ import os import unittest -from llama_index.core import BaseQueryEngine +from llama_index.core.base_query_engine import BaseQueryEngine from llama_index.vector_stores import ExactMatchFilter, FilterCondition, MetadataFilters from utils.query_engine.discord_query_engine import prepare_discord_engine diff --git a/utils/credentials.py b/utils/credentials.py index 234c82c..1ccf020 100644 --- a/utils/credentials.py +++ b/utils/credentials.py @@ -5,13 +5,13 @@ def load_postgres_credentials() -> dict[str, str]: """ - load posgresql db credentials from .env + load postgresql db credentials from .env Returns: --------- postgres_creds : dict[str, Any] postgresql credentials - a dictionary representive of + a dictionary representative of `user`: str `password` : str `host` : str @@ -37,7 +37,7 @@ def load_rabbitmq_credentials() -> dict[str, str]: --------- rabbitmq_creds : dict[str, Any] rabbitmq credentials - a dictionary representive of + a dictionary representative of `user`: str `password` : str `host` : str @@ -53,3 +53,29 @@ def load_rabbitmq_credentials() -> dict[str, str]: rabbitmq_creds["port"] = os.getenv("RABBIT_PORT", "") return rabbitmq_creds + + +def load_mongo_credentials() -> dict[str, str]: + """ + load mongo db credentials from .env + + Returns: + --------- + mongo_creds : dict[str, Any] + mongodb credentials + a dictionary representative of + `user`: str + `password` : str + `host` : str + `port` : int + """ + load_dotenv() + + mongo_creds = {} + + mongo_creds["user"] = os.getenv("MONGODB_USER", "") + mongo_creds["password"] = os.getenv("MONGODB_PASS", "") + mongo_creds["host"] = os.getenv("MONGODB_HOST", "") + mongo_creds["port"] = os.getenv("MONGODB_PORT", "") + + return mongo_creds diff --git a/utils/fetch_community_id.py b/utils/fetch_community_id.py new file mode 100644 index 0000000..654688e --- /dev/null +++ b/utils/fetch_community_id.py @@ -0,0 +1,27 @@ +from .mongo import MongoSingleton + + +def fetch_community_id_by_guild_id(guild_id: str) -> str: + """ + find the community id using the given guild id + + Parameters + ----------- + guild_id : str + the discord guild to find its community id + + Returns + --------- + community_id : str + the community id that the guild is for + """ + + client = MongoSingleton.get_instance().get_client() + + platform = client["Core"]["platforms"].find_one( + {"metadata.id": guild_id, "name": "discord"}, {"community": 1} + ) + if platform is None: + raise ValueError(f"The guild id {guild_id} does not exist!") + platform_id = str(platform["community"]) + return platform_id diff --git a/utils/mongo.py b/utils/mongo.py new file mode 100644 index 0000000..4259cda --- /dev/null +++ b/utils/mongo.py @@ -0,0 +1,45 @@ +import logging +from typing import Any + +from pymongo import MongoClient + +from .credentials import load_mongo_credentials + + +class MongoSingleton: + __instance = None + + def __init__(self): + if MongoSingleton.__instance is not None: + raise Exception("This class is a singleton!") + else: + creds = load_mongo_credentials() + connection_uri = config_mogno_creds(creds) + self.client = MongoClient(connection_uri) + MongoSingleton.__instance = self + + @staticmethod + def get_instance(): + if MongoSingleton.__instance is None: + MongoSingleton() + try: + info = MongoSingleton.__instance.client.server_info() + logging.info(f"MongoDB Connected Successfully! server info: {info}") + except Exception as exp: + logging.error(f"MongoDB not connected! exp: {exp}") + + return MongoSingleton.__instance + + def get_client(self): + return self.client + + +def config_mogno_creds(mongo_creds: dict[str, Any]): + user = mongo_creds["user"] + password = mongo_creds["password"] + host = mongo_creds["host"] + port = mongo_creds["port"] + + connection = f"mongodb://{user}:{password}@{host}:{port}" + + return connection diff --git a/utils/query_engine/discord_query_engine.py b/utils/query_engine/discord_query_engine.py index 6a29833..d5bdaf7 100644 --- a/utils/query_engine/discord_query_engine.py +++ b/utils/query_engine/discord_query_engine.py @@ -1,7 +1,7 @@ from bot.retrievers.forum_summary_retriever import ForumBasedSummaryRetriever from bot.retrievers.process_dates import process_dates from bot.retrievers.utils.load_hyperparams import load_hyperparams -from llama_index.core import BaseQueryEngine +from llama_index.core.base_query_engine import BaseQueryEngine from llama_index.vector_stores import ExactMatchFilter, FilterCondition, MetadataFilters from tc_hivemind_backend.embeddings.cohere import CohereEmbedding from tc_hivemind_backend.pg_vector_access import PGVectorAccess @@ -106,7 +106,7 @@ def prepare_discord_engine_auto_filter( the query (question) of the user similarity_top_k : int | None the value for the initial summary search - to get the `k2` count simliar nodes + to get the `k2` count similar nodes if `None`, then would read from `.env` d : int this would make the secondary search (`prepare_discord_engine`) diff --git a/worker.py b/worker.py index 48ddb05..41c4f4d 100644 --- a/worker.py +++ b/worker.py @@ -1,16 +1,37 @@ -from celery_app.tasks import add +import logging +from typing import Any + +from celery_app.tasks import ask_question_auto_search from tc_messageBroker import RabbitMQ from tc_messageBroker.rabbit_mq.event import Event +from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import ( + ChatInputCommandInteraction, +) from tc_messageBroker.rabbit_mq.queue import Queue from utils.credentials import load_rabbitmq_credentials - - -# TODO: Update according to our requirements -def do_something(recieved_data): - message = "Calculation Results:" - print(message) - print(f"recieved_data: {recieved_data}") - add.delay(20, 14) +from utils.fetch_community_id import fetch_community_id_by_guild_id + + +def query_llm(recieved_data: dict[str, Any]): + """ + query the llm using the received data + """ + recieved_input = ChatInputCommandInteraction.from_dict(recieved_data) + # For now we just have one user input + if len(recieved_input.options["_hoistedOptions"]) > 1: + logging.warning( + "_hoistedOptions does contain more user inputs " + "but for now we're just using the first one!" + ) + + user_input = recieved_input.options["_hoistedOptions"][0]["value"] + + community_id = fetch_community_id_by_guild_id(guild_id=recieved_input.guild_id) + ask_question_auto_search.delay( + question=user_input, + community_id=community_id, + bot_given_info=recieved_data, + ) def job_recieve(broker_url, port, username, password): @@ -18,8 +39,7 @@ def job_recieve(broker_url, port, username, password): broker_url=broker_url, port=port, username=username, password=password ) - # TODO: Update according to our requirements - rabbit_mq.on_event(Event.HIVEMIND.INTERACTION_CREATED, do_something) + rabbit_mq.on_event(Event.HIVEMIND.INTERACTION_CREATED, query_llm) rabbit_mq.connect(Queue.HIVEMIND) rabbit_mq.consume(Queue.HIVEMIND)