Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat add hivemind etl scripts #15

Merged
merged 49 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8919742
feat: Adding the hivemind ETL scripts!
amindadgar Dec 13, 2023
0ee1e41
update: removing some parts that were for debugging!
amindadgar Dec 13, 2023
c7455de
update: removing phoenix llm monitoring tool for now!
amindadgar Dec 14, 2023
92f48b9
update: comment phoenix dags!
amindadgar Dec 14, 2023
8619ff3
update: Adding None values in case of channel, and day summaries!
amindadgar Dec 14, 2023
06fbf7c
Update: Discord summarization query!
amindadgar Dec 14, 2023
2f21fd9
fix: typo in help command of discourse_vectorstore_etl!
amindadgar Dec 14, 2023
eb08b06
update: Adding a condition to discourse data fetching!
amindadgar Dec 14, 2023
e3a6294
Update: Increased chunk size to 512!
amindadgar Dec 14, 2023
996d279
feat: Added the discord summary boundary case!
amindadgar Dec 14, 2023
c2f44c3
update: code cleaning with black!
amindadgar Dec 14, 2023
4926d60
fix: Updated roles id finding in text content!
amindadgar Dec 14, 2023
620d20e
feat: Updated the discord-vector-store interval!
amindadgar Dec 14, 2023
d1beb5e
feat: Adding discourse summarizer codes!
amindadgar Dec 14, 2023
690690c
udpate: moved the tests to its right directory!
amindadgar Dec 14, 2023
080a485
update: fixing the airflow image version to 2.7.3!
amindadgar Dec 14, 2023
bc26f1a
fix: each post always have 1 category!
amindadgar Dec 14, 2023
4e57b07
update: Added more test cases for discourse summary!
amindadgar Dec 14, 2023
d47e407
feat: Completing the discourse summary!
amindadgar Dec 18, 2023
ec5efa3
feat: commenting the debug parts and code cleaning!
amindadgar Dec 18, 2023
9657cc5
feat: For now excluding all metadata for discord summaries!
amindadgar Dec 19, 2023
7faf40c
feat: excluding all metadata in summaries!
amindadgar Dec 19, 2023
6045dfd
update: remove credentials printing!
amindadgar Dec 19, 2023
1a56e53
feat: Added logging to the iteration count of summaries!
amindadgar Dec 19, 2023
6006d49
feat: Added logs to summary preparation!
amindadgar Dec 19, 2023
b963c22
Merge branch 'main' into feat-add-hivemind-etl-discourse-summary
amindadgar Dec 19, 2023
142f0c4
update: removing duplicate codes!
amindadgar Dec 19, 2023
252eded
fix: linter issues based on super-linter rules!
amindadgar Dec 19, 2023
0ebdd2f
fix: more linter issues!
amindadgar Dec 19, 2023
b37aec6
fix: more linter issues!
amindadgar Dec 19, 2023
88338b2
fix: linter issues and the requiremnets.txt issue!
amindadgar Dec 19, 2023
d695bb2
feat: Added init files so pytest can find the tests!
amindadgar Dec 19, 2023
3ce2033
fix: pylint linter issue!
amindadgar Dec 19, 2023
200a401
trying more!
amindadgar Dec 19, 2023
7602d90
feat: added textlinter ignore for requirements.txt file!
amindadgar Dec 19, 2023
7b4cb79
trying more!
amindadgar Dec 19, 2023
761cf27
Merge branch 'main' into feat-add-hivemind-etl-discourse-summary
amindadgar Dec 19, 2023
f6a0d99
update: test cases with the latest code updates!
amindadgar Dec 20, 2023
5c55642
feat: Added new services to docker-compose!
amindadgar Dec 20, 2023
838ce68
fix: roles have different structure in text!
amindadgar Dec 20, 2023
4ff253e
update: test cases with latest code updates!
amindadgar Dec 20, 2023
242a43b
fix: docker-compose.test.yaml creds!
amindadgar Dec 20, 2023
2f27d52
trying to fix the textlinter error!
amindadgar Dec 20, 2023
c787ae3
update: removing the pypdf package for now!
amindadgar Dec 20, 2023
fafe587
Merge pull request #18 from TogetherCrew/feat-add-hivemind-etl-discou…
amindadgar Dec 20, 2023
c96d92d
feat: Added the embedding_dim and chunk_size as env variables!
amindadgar Dec 20, 2023
457c97d
fix: linter errors based on super-linter rules!
amindadgar Dec 20, 2023
d83d7d9
feat: Added the new env variables to the docker-compose!
amindadgar Dec 20, 2023
b27cac8
feat: reading embed dim from .env!
amindadgar Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,7 @@ cython_debug/
.org

# Logs
logs
logs

credentials_oauth.json
credentials.json
97 changes: 97 additions & 0 deletions dags/hivemind_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping."""
from __future__ import annotations

import logging
from datetime import datetime, timedelta

import phoenix as px
from airflow import DAG
from airflow.decorators import task

from hivemind_etl_helpers.discord_mongo_summary_etl import process_discord_summaries
from hivemind_etl_helpers.discord_mongo_vector_store_etl import (
process_discord_guild_mongo,
)
from hivemind_etl_helpers.src.utils.mongo import MongoSingleton


def setup_phonix():
_ = px.launch_app()
logging.info(f"Phonix Session Url: {px.active_session().url}")


with DAG(dag_id="phonix_startup", start_date=datetime(2022, 3, 4)) as dag:
dag.on_startup.append(setup_phonix)


with DAG(
dag_id="discord_vector_store_update",
start_date=datetime(2022, 11, 10, 12),
schedule_interval=timedelta(minutes=60),
catchup=False,
) as dag:

@task
def get_all_discord_communities() -> list[str]:
"""
Getting all communities having discord from database
"""
mongo = MongoSingleton.get_instance()
communities = (
mongo.client["Core"]["platforms"]
.find({"name": "discord"})
.distinct("community")
)
return communities

@task
def start_discord_vectorstore(community_id: str):
logging.info(f"Working on community, {community_id}")
process_discord_guild_mongo(community_id=community_id)
logging.info(f"Community {community_id} Job finished!")

communities = get_all_discord_communities()
# `start_discord_vectorstore` will be called multiple times
# with the length of the list
start_discord_vectorstore.expand(community_id=communities)

with DAG(dag_id="discord_summary_vector_store", start_date=datetime(2023, 1, 1)) as dag:

@task
def get_all_discord_communities() -> list[str]:
"""
Getting all communities having discord from database
"""
mongo = MongoSingleton.get_instance()
communities = (
mongo.client["Core"]["platforms"]
.find({"name": "discord"})
.distinct("community")
)
return communities

@task
def start_discord_summary_vectorstore(community_id: str):
logging.info(f"Working on community, {community_id}")
process_discord_summaries(community_id=community_id, verbose=False)
logging.info(f"Community {community_id} Job finished!")

communities = get_all_discord_communities()
start_discord_summary_vectorstore.extend(community_id=communities)
13 changes: 13 additions & 0 deletions dags/hivemind_etl_helpers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Hivemind ETL

In this repository we're writing the data etl process scripts for hivemind bot.

## How to

For now, the scripts are focused on having discord data within mongodb and will store the embdeddings in postgress. To start the scripts for mongodb discord data

```python3
python discord_mongo_etl.py [guild_id]
```

Notes: Please replace [guild_id] with your guild id.
Empty file.
122 changes: 122 additions & 0 deletions dags/hivemind_etl_helpers/discord_mongo_summary_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import argparse
import logging

from llama_index import Document
from llama_index.response_synthesizers import get_response_synthesizer

from hivemind_etl_helpers.src.db.discord.discord_summary import DiscordSummary
from hivemind_etl_helpers.src.db.discord.find_guild_id import (
find_guild_id_by_community_id,
)
from hivemind_etl_helpers.src.document_node_parser import configure_node_parser
from hivemind_etl_helpers.src.utils.cohere_embedding import CohereEmbedding
from hivemind_etl_helpers.src.utils.pg_db_utils import setup_db
from hivemind_etl_helpers.src.utils.pg_vector_access import PGVectorAccess


def process_discord_summaries(
community_id: str, verbose: bool = False
) -> list[Document]:
"""
prepare the discord data by grouping it into thread, channel and day
Note: This will always process the data until 1 day ago.

Parameters
------------
community_id : str
the community id to process its guild data
verbose : bool
verbose the process of summarization or not
if `True` the summarization process will be printed out
default is `False`

Returns
---------
messages_docuemnt : list[llama_index.Document]
list of messages converted to documents
"""
guild_id = find_guild_id_by_community_id(community_id)
logging.info(f"COMMUNITYID: {community_id}, GUILDID: {guild_id}")
table_name = "discord_summary"
dbname = f"community_{community_id}"

latest_date_query = f"""
SELECT (metadata_->> 'date')::timestamp
AS latest_date
FROM data_{table_name}
ORDER BY (metadata_->>'date')::timestamp DESC
LIMIT 1;
"""
from_date = setup_db(
community_id=community_id, dbname=dbname, latest_date_query=latest_date_query
)

discord_summary = DiscordSummary(
response_synthesizer=get_response_synthesizer(response_mode="tree_summarize"),
verbose=verbose,
)

(
thread_summaries_documents,
channel_summary_documenets,
daily_summary_documenets,
) = discord_summary.prepare_summaries(
guild_id=guild_id,
from_date=from_date,
summarization_query="Please give me a summary using the data you have!",
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
)

logging.info("Getting the summaries embedding and saving within database!")

node_parser = configure_node_parser(chunk_size=256)
pg_vector = PGVectorAccess(table_name=table_name, dbname=dbname)

embed_model = CohereEmbedding()
embed_dim = 1024

# saving thread summaries
pg_vector.save_documents_in_batches(
community_id=community_id,
documents=thread_summaries_documents,
batch_size=100,
node_parser=node_parser,
max_request_per_minute=None,
embed_model=embed_model,
embed_dim=embed_dim,
request_per_minute=10000,
)

# saving daily summaries
pg_vector.save_documents_in_batches(
community_id=community_id,
documents=daily_summary_documenets,
batch_size=100,
node_parser=node_parser,
max_request_per_minute=None,
embed_model=embed_model,
embed_dim=embed_dim,
request_per_minute=10000,
)

# saving channel summaries
pg_vector.save_documents_in_batches(
community_id=community_id,
documents=channel_summary_documenets,
batch_size=100,
node_parser=node_parser,
max_request_per_minute=None,
embed_model=embed_model,
embed_dim=embed_dim,
request_per_minute=10000,
)


if __name__ == "__main__":
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"community_id", type=str, help="the Community that the guild is related to"
)
args = parser.parse_args()
process_discord_summaries(community_id=args.community_id)
77 changes: 77 additions & 0 deletions dags/hivemind_etl_helpers/discord_mongo_vector_store_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import argparse
import logging
from datetime import timedelta

from hivemind_etl_helpers.src.db.discord.discord_raw_message_to_document import (
discord_raw_to_docuemnts,
)
from hivemind_etl_helpers.src.db.discord.find_guild_id import (
find_guild_id_by_community_id,
)
from hivemind_etl_helpers.src.document_node_parser import configure_node_parser
from hivemind_etl_helpers.src.utils.cohere_embedding import CohereEmbedding
from hivemind_etl_helpers.src.utils.pg_db_utils import setup_db
from hivemind_etl_helpers.src.utils.pg_vector_access import PGVectorAccess


def process_discord_guild_mongo(community_id: str) -> None:
"""
process the discord guild messages from mongodb
and save the processed data within postgres

Parameters
-----------
community_id : str
the community id to create or use its database
"""
guild_id = find_guild_id_by_community_id(community_id)
logging.info(f"COMMUNITYID: {community_id}, GUILDID: {guild_id}")
table_name = "discord"
dbname = f"community_{community_id}"

latest_date_query = f"""
SELECT (metadata_->> 'date')::timestamp
AS latest_date
FROM data_{table_name}
ORDER BY (metadata_->>'date')::timestamp DESC
LIMIT 1;
"""
from_date = setup_db(
community_id=community_id, dbname=dbname, latest_date_query=latest_date_query
)

# because postgresql does not support miliseconds
# we might get duplicate messages
# so adding just a second after
if from_date is not None:
from_date += timedelta(seconds=1)

documents = discord_raw_to_docuemnts(guild_id=guild_id, from_date=from_date)
node_parser = configure_node_parser(chunk_size=256)
pg_vector = PGVectorAccess(table_name=table_name, dbname=dbname)

embed_model = CohereEmbedding()
embed_dim = 1024

pg_vector.save_documents_in_batches(
community_id=community_id,
documents=documents,
batch_size=100,
node_parser=node_parser,
max_request_per_minute=None,
embed_model=embed_model,
embed_dim=embed_dim,
request_per_minute=10000,
# max_request_per_day=REQUEST_PER_DAY,
)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"community_id", type=str, help="the Community that the guild is related to"
)
args = parser.parse_args()

process_discord_guild_mongo(community_id=args.community_id)
Loading