Skip to content

Commit

Permalink
Merge pull request #153 from TogetherCrew/feat/add-qdrant
Browse files Browse the repository at this point in the history
feat: Adding Qdrant vectorstore db support!
  • Loading branch information
amindadgar authored May 20, 2024
2 parents 96b7504 + 849278b commit 5ce6b54
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 233 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ USER root
COPY . .
RUN chmod +x init.sh
USER airflow
RUN pip install -r requirements.txt
RUN pip install --no-cache-dir apache-airflow==2.9.1 -r requirements.txt

FROM python:3.11-bullseye AS test
WORKDIR /project
Expand Down
70 changes: 0 additions & 70 deletions dags/hivemind_etl_helpers/gdrive_ingestion_etl.py

This file was deleted.

70 changes: 10 additions & 60 deletions dags/hivemind_etl_helpers/github_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime

from dotenv import load_dotenv
from hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline
from hivemind_etl_helpers.src.db.github.extract import (
GithubExtraction,
fetch_issues,
Expand All @@ -10,13 +11,8 @@
from hivemind_etl_helpers.src.db.github.github_organization_repos import (
get_github_organization_repos,
)
from hivemind_etl_helpers.src.db.github.load import (
PrepareDeletion,
load_documents_into_pg_database,
)
from hivemind_etl_helpers.src.db.github.transform import GitHubTransformation
from llama_index.core import Document
from tc_hivemind_backend.db.pg_db_utils import setup_db


def process_github_vectorstore(
Expand All @@ -40,30 +36,8 @@ def process_github_vectorstore(
the date to start processing data from
"""
load_dotenv()
dbname = f"community_{community_id}"
prefix = f"COMMUNITYID: {community_id} "
logging.info(prefix)

table_name = "github"

logging.info(f"{prefix}Setting up database")
latest_date_query = f"""
SELECT (metadata_->> 'created_at')::timestamp
AS latest_date
FROM data_{table_name}
ORDER BY (metadata_->>'created_at')::timestamp DESC
LIMIT 1;
"""
from_date_saved_data = setup_db(
community_id=community_id, dbname=dbname, latest_date_query=latest_date_query
)
from_date: datetime | None
if from_date_saved_data:
from_date = from_date_saved_data
else:
from_date = from_starting_date

logging.info(f"Fetching data from date: {from_date}")
logging.info(f"{prefix}Processing data!")

org_repository_ids = get_github_organization_repos(
github_organization_ids=github_org_ids
Expand All @@ -73,18 +47,10 @@ def process_github_vectorstore(

# EXTRACT
github_extractor = GithubExtraction()
github_comments = github_extractor.fetch_comments(
repository_id=repository_ids, from_date=from_date
)
github_commits = github_extractor.fetch_commits(
repository_id=repository_ids, from_date=from_date
)
github_issues = fetch_issues(repository_id=repository_ids, from_date=from_date)
github_prs = fetch_pull_requests(
repository_id=repository_ids,
from_date_created=from_starting_date,
from_date_updated=from_date_saved_data,
)
github_comments = github_extractor.fetch_comments(repository_id=repository_ids)
github_commits = github_extractor.fetch_commits(repository_id=repository_ids)
github_issues = fetch_issues(repository_id=repository_ids)
github_prs = fetch_pull_requests(repository_id=repository_ids)

# TRANSFORM
# llama-index documents
Expand All @@ -100,29 +66,13 @@ def process_github_vectorstore(
logging.debug(f"{prefix}Transforming pull requests!")
docs_prs = github_transformation.transform_pull_requests(github_prs)

# there's no update on commits
all_documents: list[Document] = docs_commit.copy()

# checking for updates on prs, issues, and comments
delete_docs = PrepareDeletion(community_id)
docs_to_save, deletion_query = delete_docs.prepare(
pr_documents=docs_prs,
issue_documents=docs_issue,
comment_documents=docs_comment + docs_issue_comments,
all_documents: list[Document] = (
docs_commit + docs_comment + docs_issue_comments + docs_prs + docs_issue
)
all_documents.extend(docs_to_save)

logging.debug(f"{len(all_documents)} prepared to be saved!")
if len(all_documents) == 0:
logging.info("No new documents to save!")

logging.info(f"deletion_query: {deletion_query}")

# LOAD
logging.info(f"{prefix}Loading data into postgres db")
load_documents_into_pg_database(
documents=all_documents,
community_id=community_id,
table_name=table_name,
deletion_query=deletion_query,
)
ingestion_pipeline = CustomIngestionPipeline(community_id, collection_name="github")
ingestion_pipeline.run_pipeline(docs=all_documents)
75 changes: 41 additions & 34 deletions dags/hivemind_etl_helpers/ingestion_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,70 @@
from dags.hivemind_etl_helpers.src.db.gdrive.db_utils import setup_db
from dags.hivemind_etl_helpers.src.utils.redis import RedisSingleton
from llama_index.core import MockEmbedding
import logging

from hivemind_etl_helpers.src.utils.credentials import load_redis_credentials
from hivemind_etl_helpers.src.utils.mongo import get_mongo_uri
from hivemind_etl_helpers.src.utils.redis import RedisSingleton
from llama_index.core import Document, MockEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionCache,
IngestionPipeline,
)
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.storage.docstore.postgres import PostgresDocumentStore
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.vector_stores.postgres import PGVectorStore
from tc_hivemind_backend.db.credentials import load_postgres_credentials
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.db.utils.model_hyperparams import load_model_hyperparams
from tc_hivemind_backend.embeddings.cohere import CohereEmbedding
from tc_hivemind_backend.qdrant_vector_access import QDrantVectorAccess


class CustomIngestionPipeline:
def __init__(self, community_id: str, table_name: str, testing: bool = False):
self.postgres_credentials = load_postgres_credentials()
self.table_name = table_name
self.dbname = f"community_{community_id}"
def __init__(self, community_id: str, collection_name: str, testing: bool = False):
self.community_id = community_id
self.qdrant_client = QdrantSingleton.get_instance().client

_, self.embedding_dim = load_model_hyperparams()
self.pg_creds = load_postgres_credentials()
self.redis_cred = load_redis_credentials()
self.collection_name = community_id
self.platform_name = collection_name

self.embed_model = (
CohereEmbedding() if not testing else MockEmbedding(embed_dim=1024)
CohereEmbedding()
if not testing
else MockEmbedding(embed_dim=self.embedding_dim)
)
self.redis_client = RedisSingleton.get_instance().get_client()

def run_pipeline(self, docs):
_, embedding_dim = load_model_hyperparams()
setup_db(community_id=self.community_id)
def run_pipeline(self, docs: list[Document]):
# qdrant is just collection based and doesn't have any database
qdrant_collection_name = f"{self.collection_name}_{self.platform_name}"
vector_access = QDrantVectorAccess(collection_name=qdrant_collection_name)
vector_store = vector_access.setup_qdrant_vector_store()

pipeline = IngestionPipeline(
transformations=[
SemanticSplitterNodeParser(embed_model=self.embed_model),
self.embed_model,
],
docstore=PostgresDocumentStore.from_params(
host=self.postgres_credentials["host"],
port=self.postgres_credentials["port"],
user=self.postgres_credentials["user"],
password=self.postgres_credentials["password"],
database=self.dbname,
table_name=self.table_name + "_docstore",
),
vector_store=PGVectorStore.from_params(
host=self.postgres_credentials["host"],
port=self.postgres_credentials["port"],
user=self.postgres_credentials["user"],
password=self.postgres_credentials["password"],
database=self.dbname,
table_name=self.table_name,
embed_dim=embedding_dim,
docstore=MongoDocumentStore.from_uri(
uri=get_mongo_uri(),
db_name=f"docstore_{self.collection_name}",
namespace=self.platform_name,
),
vector_store=vector_store,
cache=IngestionCache(
cache=RedisCache.from_redis_client(self.redis_client),
collection=self.dbname + f"_{self.table_name}" + "_ingestion_cache",
collection=f"{self.collection_name}_{self.platform_name}_ingestion_cache",
docstore_strategy=DocstoreStrategy.UPSERTS,
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)

nodes = pipeline.run(documents=docs, show_progress=True)

return nodes
try:
nodes = pipeline.run(documents=docs, show_progress=True)
return nodes
except Exception as e:
logging.error(
f"An error occurred while running the pipeline: {e}", exc_info=True
)
10 changes: 6 additions & 4 deletions dags/hivemind_etl_helpers/notion_etl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from dags.hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline
from dags.hivemind_etl_helpers.src.db.notion.extractor import NotionExtractor
from hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline
from hivemind_etl_helpers.src.db.notion.extractor import NotionExtractor


def process_notion_etl(
Expand Down Expand Up @@ -41,8 +41,10 @@ def process_notion_etl(
except TypeError as exp:
logging.info(f"No documents retrieved from notion! exp: {exp}")

table_name = "notion"
ingestion_pipeline = CustomIngestionPipeline(community_id, table_name=table_name)
collection_name = "notion"
ingestion_pipeline = CustomIngestionPipeline(
community_id, collection_name=collection_name
)
try:
ingestion_pipeline.run_pipeline(docs=documents)
except Exception as e:
Expand Down
22 changes: 16 additions & 6 deletions dags/hivemind_etl_helpers/src/utils/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,20 @@ def load_redis_credentials() -> dict[str, str]:
"""
load_dotenv()

redis_creds: dict[str, str] = {}

redis_creds["port"] = os.getenv("REDIS_PORT", "")
redis_creds["password"] = os.getenv("REDIS_PASSWORD", "")
redis_creds["host"] = os.getenv("REDIS_HOST", "")

host = os.getenv("REDIS_HOST")
port = os.getenv("REDIS_PORT")
password = os.getenv("REDIS_PASSWORD")

if host is None:
raise ValueError("`REDIS_HOST` is not set in env credentials!")
if port is None:
raise ValueError("`REDIS_PORT` is not set in env credentials!")
if password is None:
raise ValueError("`REDIS_PASSWORD` is not set in env credentials!")

redis_creds: dict[str, str] = {
"host": host,
"port": port,
"password": password,
}
return redis_creds
7 changes: 3 additions & 4 deletions dags/hivemind_etl_helpers/src/utils/mongo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from typing import Any

from pymongo import MongoClient

Expand All @@ -13,8 +12,7 @@ def __init__(self):
if MongoSingleton.__instance is not None:
raise Exception("This class is a singleton!")
else:
creds = load_mongo_credentials()
connection_uri = config_mogno_creds(creds)
connection_uri = get_mongo_uri()
self.client = MongoClient(connection_uri)
MongoSingleton.__instance = self

Expand All @@ -34,7 +32,8 @@ def get_client(self):
return self.client


def config_mogno_creds(mongo_creds: dict[str, Any]):
def get_mongo_uri() -> str:
mongo_creds = load_mongo_credentials()
user = mongo_creds["user"]
password = mongo_creds["password"]
host = mongo_creds["host"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import datetime
from unittest import TestCase

from dags.hivemind_etl_helpers.src.db.github.extract import GithubExtraction
from github.neo4j_storage.neo4j_connection import Neo4jConnection
from hivemind_etl_helpers.src.db.github.extract import GithubExtraction


class TestFetchCommits(TestCase):
Expand Down
Loading

0 comments on commit 5ce6b54

Please sign in to comment.