diff --git a/dags/hivemind_etl_helpers/debugs.py b/dags/hivemind_etl_helpers/debugs.py deleted file mode 100644 index 799bd95c..00000000 --- a/dags/hivemind_etl_helpers/debugs.py +++ /dev/null @@ -1,8 +0,0 @@ -import logging - -from tc_hivemind_backend.db.credentials import load_qdrant_credentials - - -def pring_qdrant_creds(): - qdrant_creds = load_qdrant_credentials() - logging.info(f"qdrant_creds: {qdrant_creds}") diff --git a/dags/hivemind_etl_helpers/gdrive_etl.py b/dags/hivemind_etl_helpers/gdrive_etl.py deleted file mode 100644 index 25a14686..00000000 --- a/dags/hivemind_etl_helpers/gdrive_etl.py +++ /dev/null @@ -1,106 +0,0 @@ -import argparse -import logging - -from hivemind_etl_helpers.src.db.gdrive.db_utils import setup_db -from hivemind_etl_helpers.src.db.gdrive.retrieve_documents import ( - retrieve_file_documents, - retrieve_folder_documents, -) -from hivemind_etl_helpers.src.document_node_parser import configure_node_parser -from hivemind_etl_helpers.src.utils.check_documents import check_documents -from tc_hivemind_backend.db.utils.model_hyperparams import load_model_hyperparams -from tc_hivemind_backend.embeddings.cohere import CohereEmbedding -from tc_hivemind_backend.pg_vector_access import PGVectorAccess - - -def process_gdrive( - community_id: str, folder_id: str | None = None, file_ids: list[str] | None = None -) -> None: - """ - process the google drive files - and save the processed data within postgresql - - Parameters - ----------- - community_id : str - the community to save its data - folder_id : str | None - the folder id to process its data - default is None - file_ids : list[str] | None - the file ids to process their data - default is None - - Note: One of `folder_id` or `file_ids` should be given. - """ - chunk_size, embedding_dim = load_model_hyperparams() - table_name = "gdrive" - dbname = f"community_{community_id}" - - if folder_id is None and file_ids is None: - raise ValueError("At least one of the `folder_id` or `file_ids` must be given!") - - setup_db(community_id=community_id) - - try: - documents = [] - if folder_id is not None: - docs = retrieve_folder_documents(folder_id=folder_id) - documents.extend(docs) - if file_ids is not None: - docs = retrieve_file_documents(file_ids=file_ids) - documents.extend(docs) - except TypeError as exp: - logging.info(f"No documents retrieved from gdrive! exp: {exp}") - - node_parser = configure_node_parser(chunk_size=chunk_size) - pg_vector = PGVectorAccess(table_name=table_name, dbname=dbname) - - documents, doc_file_ids_to_delete = check_documents( - documents, - community_id, - identifier="file id", - date_field="modified at", - table_name=table_name, - ) - # print("len(documents) to insert:", len(documents)) - # print("doc_file_ids_to_delete:", doc_file_ids_to_delete) - - # TODO: Delete the files with id `doc_file_ids_to_delete` - - embed_model = CohereEmbedding() - - pg_vector.save_documents_in_batches( - community_id=community_id, - documents=documents, - batch_size=100, - node_parser=node_parser, - max_request_per_minute=None, - embed_model=embed_model, - embed_dim=embedding_dim, - ) - - -if __name__ == "__main__": - logging.basicConfig() - logging.getLogger().setLevel(logging.INFO) - parser = argparse.ArgumentParser() - parser.add_argument( - "community_id", help="the community to save the gdrive data for it" - ) - - chinese_id = "17C7aXgnGa1v6C2wtS_tMDnaM4rBswCvO" - italian_id = "1Czy2LYRXfctHrAv-TKoxSTb6NeImky9P" - # portuguese_id = "1ptulSDDO1pB3f8REijnO9KlD1ytKOY2n" - # russian_id = "1PEufDX2C4JkeFunlKK1y7nug3LbWMHc5" - # spanish_id = "1Jrxuj92pKvGpggctKh2EjvzmfqDaoTZo" - turkish_id = "1PnzssIXt3FgDUbhX0dioTWhY1ByYLySD" - test_small_file_id = "115A_dZBXUZWvhos898kEoVKpZ2oyagyw" - - args = parser.parse_args() - # process_gdrive(community_id=args.community_id, folder_id="***") - process_gdrive( - community_id=args.community_id, - file_ids=[chinese_id], - # folder_id="1IiGzpvnKSUje2jhXCEtWXINVqlcVmVst" - ) diff --git a/dags/hivemind_etl_helpers/ingestion_pipeline.py b/dags/hivemind_etl_helpers/ingestion_pipeline.py index 2d8080b7..6b34988e 100644 --- a/dags/hivemind_etl_helpers/ingestion_pipeline.py +++ b/dags/hivemind_etl_helpers/ingestion_pipeline.py @@ -65,10 +65,6 @@ def run_pipeline(self, docs: list[Document]): docstore_strategy=DocstoreStrategy.UPSERTS, ) logging.info("Pipeline created, now inserting documents into pipeline!") - try: - nodes = pipeline.run(documents=docs, show_progress=True) - return nodes - except Exception as e: - logging.error( - f"An error occurred while running the pipeline: {e}", exc_info=True - ) + + nodes = pipeline.run(documents=docs, show_progress=True) + return nodes diff --git a/dags/hivemind_etl_helpers/mediawiki_etl.py b/dags/hivemind_etl_helpers/mediawiki_etl.py index d61e8649..3041724e 100644 --- a/dags/hivemind_etl_helpers/mediawiki_etl.py +++ b/dags/hivemind_etl_helpers/mediawiki_etl.py @@ -25,18 +25,14 @@ def process_mediawiki_etl( """ if page_titles is None: raise ValueError("The `page_titles` must be given!") - try: - mediawiki_extractor = MediaWikiExtractor(api_url) - documents = mediawiki_extractor.extract( - page_ids=page_titles, - ) - except TypeError as exp: - logging.info(f"No documents retrieved from MediaWiki! exp: {exp}") + + logging.info(f"Processing community id: {community_id}") + mediawiki_extractor = MediaWikiExtractor(api_url) + documents = mediawiki_extractor.extract( + page_ids=page_titles, + ) ingestion_pipeline = CustomIngestionPipeline( community_id=community_id, collection_name="mediawiki" ) - try: - ingestion_pipeline.run_pipeline(docs=documents) - except Exception as e: - logging.info(f"Error while trying to run MediaWikiIngestionPipeline! exp: {e}") + ingestion_pipeline.run_pipeline(docs=documents) diff --git a/dags/hivemind_etl_helpers/notion_etl.py b/dags/hivemind_etl_helpers/notion_etl.py index 00c88ef1..94714a5f 100644 --- a/dags/hivemind_etl_helpers/notion_etl.py +++ b/dags/hivemind_etl_helpers/notion_etl.py @@ -33,19 +33,12 @@ def process_notion_etl( raise ValueError( "At least one of the `database_ids` or `page_ids` must be given!" ) - try: - notion_extractor = NotionExtractor(notion_token=access_token) - documents = notion_extractor.extract( - page_ids=page_ids, database_ids=database_ids - ) - except TypeError as exp: - logging.info(f"No documents retrieved from notion! exp: {exp}") + logging.info(f"Processing community id: {community_id}") + notion_extractor = NotionExtractor(notion_token=access_token) + documents = notion_extractor.extract(page_ids=page_ids, database_ids=database_ids) collection_name = "notion" ingestion_pipeline = CustomIngestionPipeline( community_id, collection_name=collection_name ) - try: - ingestion_pipeline.run_pipeline(docs=documents) - except Exception as e: - logging.info(f"Error while trying to run NotionIngestionPipeline! exp: {e}") + ingestion_pipeline.run_pipeline(docs=documents) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py index aa9b88f8..d7bcaf5d 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py @@ -41,9 +41,5 @@ def extract_from_pages(self, pages: List[str]) -> List[Document]: Returns: List[Document]: A list of Document objects extracted from the specified pages. """ - try: - response = self.wikimedia_reader.load_data(pages=pages) - return response - except Exception as e: - print(f"Failed to extract from pages {pages}: {str(e)}") - return [] + response = self.wikimedia_reader.load_data(pages=pages) + return response diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py index 1c3598b2..8812e7e4 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py @@ -3,8 +3,8 @@ from typing import Any, List, Optional import wikipedia +from llama_index.core import Document from llama_index.legacy.readers.base import BasePydanticReader -from llama_index.legacy.schema import Document class MediaWikiReader(BasePydanticReader): diff --git a/dags/hivemind_etl_helpers/src/db/notion/extractor.py b/dags/hivemind_etl_helpers/src/db/notion/extractor.py index 7b8908f8..73964d35 100644 --- a/dags/hivemind_etl_helpers/src/db/notion/extractor.py +++ b/dags/hivemind_etl_helpers/src/db/notion/extractor.py @@ -61,12 +61,8 @@ def extract_from_database(self, database_id: str) -> List[Document]: Returns: List[Document]: A list of Document objects extracted from the specified database. """ - try: - response = self.notion_page_reader.load_data(database_id=database_id) - return response - except Exception as e: - print(f"Failed to extract from database {database_id}: {str(e)}") - return [] + response = self.notion_page_reader.load_data(database_id=database_id) + return response def extract_from_pages(self, page_ids: List[str]) -> List[Document]: """ @@ -78,9 +74,5 @@ def extract_from_pages(self, page_ids: List[str]) -> List[Document]: Returns: List[Document]: A list of Document objects extracted from the specified pages. """ - try: - response = self.notion_page_reader.load_data(page_ids=page_ids) - return response - except Exception as e: - print(f"Failed to extract from pages {page_ids}: {str(e)}") - return [] + response = self.notion_page_reader.load_data(page_ids=page_ids) + return response diff --git a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py index c8da307b..6badf729 100644 --- a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py +++ b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py @@ -53,15 +53,3 @@ def test_handle_invalid_page_titles(self): documents = self.extractor.extract(page_ids=invalid_pages) self.assertEqual(len(documents), 0) self.mock_reader.load_data.assert_called_with(pages=invalid_pages) - - def test_extract_from_valid_pages_with_exception(self): - """ - Test extracting from valid pages with an exception occurring. - Expecting empty results. - """ - test_pages = ["Python_(programming_language)"] - self.mock_reader.load_data.side_effect = Exception("Mocked exception") - - documents = self.extractor.extract(page_ids=test_pages) - self.assertEqual(len(documents), 0) - self.mock_reader.load_data.assert_called_once_with(pages=test_pages) diff --git a/dags/hivemind_mediawiki_etl.py b/dags/hivemind_mediawiki_etl.py index 2dec6019..339ec73e 100644 --- a/dags/hivemind_mediawiki_etl.py +++ b/dags/hivemind_mediawiki_etl.py @@ -5,7 +5,6 @@ from airflow import DAG from airflow.decorators import task -from hivemind_etl_helpers.debugs import pring_qdrant_creds from hivemind_etl_helpers.mediawiki_etl import process_mediawiki_etl from hivemind_etl_helpers.src.utils.modules import ModulesMediaWiki @@ -20,8 +19,6 @@ def get_mediawiki_communities() -> list[dict[str, str | list[str]]]: """ Getting all communities having mediawiki from database """ - # TODO: REMOVE!!! - pring_qdrant_creds() communities = ModulesMediaWiki().get_learning_platforms() return communities