From 3286acd092a302d4b4f3e1ff9e1123e52c44c0d3 Mon Sep 17 00:00:00 2001 From: polux0 Date: Fri, 17 May 2024 11:53:56 +0200 Subject: [PATCH 01/11] feat: `WikipediaReader` that accepts `api_url` -> `MediaWikiReader`; --- .../src/db/mediawiki/__init__.py | 0 .../db/mediawiki/custom_wikipedia_reader.py | 49 +++++++++++++++++++ .../src/db/mediawiki/test.py | 22 +++++++++ requirements.txt | 2 + 4 files changed, 73 insertions(+) create mode 100644 dags/hivemind_etl_helpers/src/db/mediawiki/__init__.py create mode 100644 dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py create mode 100644 dags/hivemind_etl_helpers/src/db/mediawiki/test.py diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/__init__.py b/dags/hivemind_etl_helpers/src/db/mediawiki/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py new file mode 100644 index 00000000..07998ce3 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py @@ -0,0 +1,49 @@ +"""Simple reader that reads mediawiki.""" + +from typing import Any, List, Optional + +from llama_index.legacy.readers.base import BasePydanticReader +from llama_index.legacy.schema import Document + + +class WikipediaReader(BasePydanticReader): + """Wikipedia reader. + + Reads a page. + + """ + + is_remote: bool = True + + def __init__(self, api_url: Optional[str] = None) -> None: + """Initialize with parameters.""" + try: + import wikipedia # noqa + except ImportError: + raise ImportError( + "`wikipedia` package not found, please run `pip install wikipedia`" + ) + + if api_url: + wikipedia.set_api_url(api_url) + + @classmethod + def class_name(cls) -> str: + return "WikipediaReader" + + def load_data(self, pages: List[str], **load_kwargs: Any) -> List[Document]: + """Load data from the input directory. + + Args: + pages (List[str]): List of pages to read. + + """ + import wikipedia + + results = [] + for page in pages: + wiki_page = wikipedia.page(page, **load_kwargs) + page_content = wiki_page.content + page_id = wiki_page.pageid + results.append(Document(id_=page_id, text=page_content)) + return results diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/test.py b/dags/hivemind_etl_helpers/src/db/mediawiki/test.py new file mode 100644 index 00000000..29332aaa --- /dev/null +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/test.py @@ -0,0 +1,22 @@ +from typing import List +from custom_wikipedia_reader import WikipediaReader + + +# Define a function to test WikipediaReader with Wikivoyage +def test_wikipedia_reader_with_wikivoyage(api_url: str, pages: List[str]): + # Initialize the WikipediaReader with the given API URL + reader = WikipediaReader(api_url=api_url) + + # Load data for the given pages + documents = reader.load_data(pages) + + # Print out the results + for doc in documents: + print(f"Document ID: {doc.id_}") + print(f"Document Text: {doc.text[:500]}...") # Print first 500 characters for brevity + +# Example page title from Wikivoyage to test +pages_to_test = ["Serbia"] + +# Call the test function with the Wikivoyage API URL and the example page +test_wikipedia_reader_with_wikivoyage("https://en.wikivoyage.org/w/api.php", pages_to_test) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a7a7cd8b..2d42ac1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,5 @@ llama-index-storage-docstore-postgres==0.1.3 llama-index-storage-docstore-redis==0.1.2 llama-index-vector-stores-postgres==0.1.7 redis===5.0.4 +git+https://github.com/polux0/Wikipedia +llama-index-readers-wikipedia===0.1.4 \ No newline at end of file From 01da4348d2b456e99a8869efe90a1505bca171dc Mon Sep 17 00:00:00 2001 From: polux0 Date: Mon, 20 May 2024 11:22:39 +0200 Subject: [PATCH 02/11] feat: minimal viable; --- dags/hivemind_etl_helpers/mediawiki_etl.py | 42 +++++ .../src/db/mediawiki/extractor.py | 66 ++++++++ ...ikipedia_reader.py => mediawiki_reader.py} | 6 +- .../src/db/mediawiki/test.py | 4 +- .../src/utils/modules/__init__.py | 1 + .../src/utils/modules/mediawiki.py | 47 +++++ .../integration/test_mediawiki_modules.py | 160 ++++++++++++++++++ .../tests/unit/test_mediawiki_extractor.py | 66 ++++++++ dags/hivemind_mediawiki_etl.py | 38 +++++ 9 files changed, 425 insertions(+), 5 deletions(-) create mode 100644 dags/hivemind_etl_helpers/mediawiki_etl.py create mode 100644 dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py rename dags/hivemind_etl_helpers/src/db/mediawiki/{custom_wikipedia_reader.py => mediawiki_reader.py} (94%) create mode 100644 dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py create mode 100644 dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py create mode 100644 dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py create mode 100644 dags/hivemind_mediawiki_etl.py diff --git a/dags/hivemind_etl_helpers/mediawiki_etl.py b/dags/hivemind_etl_helpers/mediawiki_etl.py new file mode 100644 index 00000000..4811850a --- /dev/null +++ b/dags/hivemind_etl_helpers/mediawiki_etl.py @@ -0,0 +1,42 @@ +import logging + +from dags.hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline +from dags.hivemind_etl_helpers.src.db.mediawiki.extractor import MediaWikiExtractor + + +def process_mediawiki_etl( + community_id: str, + page_titles: list[str] | None = None, +) -> None: + """ + Process the MediaWiki pages or categories + and save the processed data within PostgreSQL + + Parameters + ----------- + community_id : str + the community to save its data + page_titles : list[str] | None + the page titles to process their data + default is None + + Note: The `page_titles` should be given. + """ + if page_titles is None: + raise ValueError( + "The `page_titles` must be given!" + ) + try: + mediawiki_extractor = MediaWikiExtractor() + documents = mediawiki_extractor.extract( + page_titles=page_titles, + ) + except TypeError as exp: + logging.info(f"No documents retrieved from MediaWiki! exp: {exp}") + + table_name = "mediawiki" + ingestion_pipeline = CustomIngestionPipeline(community_id, table_name=table_name) + try: + ingestion_pipeline.run_pipeline(docs=documents) + except Exception as e: + logging.info(f"Error while trying to run MediaWikiIngestionPipeline! exp: {e}") diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py new file mode 100644 index 00000000..b61e4e44 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py @@ -0,0 +1,66 @@ +import os +from typing import List, Optional + +from dotenv import load_dotenv +from llama_index.core import Document + +from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader + + +class MediaWikiExtractor: + def __init__(self, api_url: Optional[str] = "https://en.wikipedia.org/w/api.php"): + """ + Initializes the MediaWikiExtractor with an API URL for Wikimedia. + If no URL is provided, it tries to load it from an environment variable. + + Args: + api_url (Optional[str]): The Wikimedia API URL. + If None, the URL is loaded from the 'WIKIMEDIA_API_URL' environment variable. + """ + if api_url is None: + load_dotenv() + self.api_url = os.getenv("WIKIMEDIA_API_URL") + else: + self.api_url = api_url + + self.wikimedia_reader = MediaWikiReader(api_url=self.api_url) + + def extract(self, pages: Optional[List[str]] = None) -> List[Document]: + """ + Extracts documents from Wikimedia pages specified by their titles. + + Args: + pages (Optional[List[str]]): List of page titles to extract documents from. + + Returns: + List[Document]: A list of Document objects extracted from the specified Wikimedia pages. + """ + if pages: + return self.extract_from_pages(pages) + return [] + + def extract_from_pages(self, pages: List[str]) -> List[Document]: + """ + Extracts documents from specific Wikimedia pages by their titles. + + Args: + pages (List[str]): The list of page titles to extract documents from. + + Returns: + List[Document]: A list of Document objects extracted from the specified pages. + """ + try: + response = self.wikimedia_reader.load_data(pages=pages) + return response + except Exception as e: + print(f"Failed to extract from pages {pages}: {str(e)}") + return [] + + +# # Example usage +# if __name__ == "__main__": +# # Example of initializing and using the extractor +# extractor = MediaWikiExtractor(api_url="https://en.wikipedia.org/w/api.php") +# documents = extractor.extract(pages=["Python (programming language)", "OpenAI"]) +# for doc in documents: +# print(doc) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py similarity index 94% rename from dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py rename to dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py index 07998ce3..d4a054a8 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/custom_wikipedia_reader.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py @@ -6,8 +6,8 @@ from llama_index.legacy.schema import Document -class WikipediaReader(BasePydanticReader): - """Wikipedia reader. +class MediaWikiReader(BasePydanticReader): + """WikiMedia reader. Reads a page. @@ -23,7 +23,7 @@ def __init__(self, api_url: Optional[str] = None) -> None: raise ImportError( "`wikipedia` package not found, please run `pip install wikipedia`" ) - + if api_url: wikipedia.set_api_url(api_url) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/test.py b/dags/hivemind_etl_helpers/src/db/mediawiki/test.py index 29332aaa..6c250c6b 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/test.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/test.py @@ -1,11 +1,11 @@ from typing import List -from custom_wikipedia_reader import WikipediaReader +from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader # Define a function to test WikipediaReader with Wikivoyage def test_wikipedia_reader_with_wikivoyage(api_url: str, pages: List[str]): # Initialize the WikipediaReader with the given API URL - reader = WikipediaReader(api_url=api_url) + reader = MediaWikiReader(api_url=api_url) # Load data for the given pages documents = reader.load_data(pages) diff --git a/dags/hivemind_etl_helpers/src/utils/modules/__init__.py b/dags/hivemind_etl_helpers/src/utils/modules/__init__.py index 41de934a..89c563a4 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/__init__.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/__init__.py @@ -3,4 +3,5 @@ from .discourse import ModulesDiscourse from .gdrive import ModulesGDrive from .github import ModulesGitHub +from .mediawiki import ModulesMediaWiki from .notion import ModulesNotion diff --git a/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py new file mode 100644 index 00000000..91fdb78f --- /dev/null +++ b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py @@ -0,0 +1,47 @@ +from .modules_base import ModulesBase + + +class ModulesMediaWiki(ModulesBase): + def __init__(self) -> None: + self.platform_name = "mediawiki" + super().__init__() + + def get_learning_platforms( + self, + ) -> list[dict[str, str | list[str]]]: + """ + Get all the MediaWiki communities with their page titles. + + Returns + --------- + community_orgs : list[dict[str, str | list[str]]] = [] + a list of MediaWiki data information + + example data output: + ``` + [{ + "community_id": "6579c364f1120850414e0dc5", + "page_titles": ["Main_Page", "Default_Page"], + }] + ``` + """ + modules = self.query(platform=self.platform_name, projection={"name": 0}) + communities_data: list[dict[str, str | list[str]]] = [] + + for module in modules: + community = module["community"] + + # each platform of the community + for platform in module["options"]["platforms"]: + if platform["name"] != self.platform_name: + continue + + modules_options = platform["metadata"] + communities_data.append( + { + "community_id": str(community), + "page_titles": modules_options.get("page_titles", []), + } + ) + + return communities_data diff --git a/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py new file mode 100644 index 00000000..c9181c0e --- /dev/null +++ b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py @@ -0,0 +1,160 @@ +from datetime import datetime +from unittest import TestCase +from bson import ObjectId +from hivemind_etl_helpers.src.utils.modules import ModulesMediaWiki +from hivemind_etl_helpers.src.utils.mongo import MongoSingleton + + +class TestGetMediaWikiModules(TestCase): + def setUp(self): + client = MongoSingleton.get_instance().client + client["Core"].drop_collection("modules") + client["Core"].drop_collection("platforms") + self.client = client + self.modules_mediawiki = ModulesMediaWiki() + + def test_get_empty_data(self): + result = self.modules_mediawiki.get_learning_platforms() + self.assertEqual(result, []) + + def test_get_single_data(self): + platform_id = ObjectId("6579c364f1120850414e0dc6") + community_id = ObjectId("6579c364f1120850414e0dc5") + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id, + "name": "mediawiki", + "metadata": {}, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + self.client["Core"]["modules"].insert_one( + { + "name": "hivemind", + "community": community_id, + "options": { + "platforms": [ + { + "platform": platform_id, + "name": "mediawiki", + "metadata": { + "page_titles": [ + "Main_Page", + "Help:Contents", + "Sandbox", + ], + }, + } + ] + }, + } + ) + + result = self.modules_mediawiki.get_learning_platforms() + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["community_id"], "6579c364f1120850414e0dc5") + self.assertEqual( + result[0]["page_titles"], + [ + "Main_Page", + "Help:Contents", + "Sandbox", + ], + ) + + def test_get_mediawiki_communities_data_multiple_platforms(self): + """ + Two mediawiki platforms for one community + """ + platform_id1 = ObjectId("6579c364f1120850414e0dc6") + platform_id2 = ObjectId("6579c364f1120850414e0dc7") + community_id = ObjectId("1009c364f1120850414e0dc5") + + self.client["Core"]["modules"].insert_one( + { + "name": "hivemind", + "community": community_id, + "options": { + "platforms": [ + { + "platform": platform_id1, + "name": "mediawiki", + "metadata": { + "page_titles": [ + "Main_Page", + "Help:Contents", + ], + }, + }, + { + "platform": platform_id2, + "name": "mediawiki", + "metadata": { + "page_titles": [ + "Sandbox", + "Wikipedia:About", + ], + }, + }, + ] + }, + } + ) + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id1, + "name": "mediawiki", + "metadata": {}, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id2, + "name": "mediawiki", + "metadata": {}, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + result = self.modules_mediawiki.get_learning_platforms() + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 2) + self.assertEqual( + result[0], + { + "community_id": str(community_id), + "page_titles": [ + "Main_Page", + "Help:Contents", + ], + }, + ) + self.assertEqual( + result[1], + { + "community_id": str(community_id), + "page_titles": [ + "Sandbox", + "Wikipedia:About", + ], + }, + ) diff --git a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py new file mode 100644 index 00000000..847e0343 --- /dev/null +++ b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py @@ -0,0 +1,66 @@ +import unittest +from unittest.mock import Mock +from llama_index.core import Document +from dags.hivemind_etl_helpers.src.db.mediawiki.extractor import MediaWikiExtractor + + +class TestMediaWikiExtractor(unittest.TestCase): + def setUp(self): + """ + Setup for each test case with a direct Mock of MediaWikiReader. + """ + self.api_url = "https://en.wikipedia.org/w/api.php" + self.extractor = MediaWikiExtractor(api_url=self.api_url) + self.mock_reader = Mock(spec=self.extractor.wikimedia_reader) + self.extractor.wikimedia_reader = self.mock_reader + + def test_initialization_with_api_url(self): + """ + Test that the extractor is initialized with the correct API URL. + """ + self.assertEqual(self.extractor.api_url, self.api_url) + + def test_extract_from_valid_pages(self): + """ + Test extracting from valid pages. + """ + mock_response = [Mock(spec=Document), Mock(spec=Document)] + self.mock_reader.load_data.return_value = mock_response + + test_pages = ["Python_(programming_language)", "OpenAI"] + documents = self.extractor.extract(pages=test_pages) + self.assertEqual(len(documents), len(mock_response)) + self.mock_reader.load_data.assert_called_once_with(pages=test_pages) + + def test_extract_no_pages(self): + """ + Test extracting with no pages provided. + Expecting empty results. + """ + documents = self.extractor.extract() + self.assertEqual(len(documents), 0) + self.mock_reader.load_data.assert_not_called() + + def test_handle_invalid_page_titles(self): + """ + Test handling of invalid page titles. + Expecting empty results. + """ + invalid_pages = ["Non_existent_page"] + self.mock_reader.load_data.return_value = [] + + documents = self.extractor.extract(pages=invalid_pages) + self.assertEqual(len(documents), 0) + self.mock_reader.load_data.assert_called_with(pages=invalid_pages) + + def test_extract_from_valid_pages_with_exception(self): + """ + Test extracting from valid pages with an exception occurring. + Expecting empty results. + """ + test_pages = ["Python_(programming_language)"] + self.mock_reader.load_data.side_effect = Exception("Mocked exception") + + documents = self.extractor.extract(pages=test_pages) + self.assertEqual(len(documents), 0) + self.mock_reader.load_data.assert_called_once_with(pages=test_pages) diff --git a/dags/hivemind_mediawiki_etl.py b/dags/hivemind_mediawiki_etl.py new file mode 100644 index 00000000..831b2191 --- /dev/null +++ b/dags/hivemind_mediawiki_etl.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import logging +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task +from dags.hivemind_etl_helpers.mediawiki_etl import process_mediawiki_etl +from hivemind_etl_helpers.src.utils.modules import ModulesMediaWiki + +with DAG( + dag_id="mediawiki_vector_store_update", + start_date=datetime(2024, 2, 21), + schedule_interval="0 4 * * *", +) as dag: + + @task + def get_mediawiki_communities() -> list[dict[str, str | list[str]]]: + """ + Getting all communities having mediawiki from database + """ + communities = ModulesMediaWiki().get_learning_platforms() + return communities + + @task + def start_mediawiki_vectorstore(community_info: dict[str, str | list[str]]): + community_id = community_info["community_id"] + page_titles = community_info["page_titles"] + + logging.info(f"Working on community, {community_id}") + process_mediawiki_etl( + community_id=community_id, # type: ignore + page_ids=page_titles, # type: ignore + ) + logging.info(f"Community {community_id} Job finished!") + + communities_info = get_mediawiki_communities() + start_mediawiki_vectorstore.expand(community_info=communities_info) From b183b2fa6c9761d80531921a4105d5beea284bd8 Mon Sep 17 00:00:00 2001 From: polux0 Date: Tue, 21 May 2024 09:59:44 +0200 Subject: [PATCH 03/11] fix: modifications in accordance with changes requested & fixing linting issues; --- dags/hivemind_etl_helpers/mediawiki_etl.py | 9 ++++---- .../src/db/mediawiki/extractor.py | 22 ++----------------- .../src/db/mediawiki/mediawiki_reader.py | 8 +------ .../src/db/mediawiki/test.py | 22 ------------------- .../src/utils/modules/mediawiki.py | 6 +++-- .../integration/test_mediawiki_modules.py | 7 ++++++ .../tests/unit/test_mediawiki_extractor.py | 3 ++- requirements.txt | 3 +-- 8 files changed, 21 insertions(+), 59 deletions(-) delete mode 100644 dags/hivemind_etl_helpers/src/db/mediawiki/test.py diff --git a/dags/hivemind_etl_helpers/mediawiki_etl.py b/dags/hivemind_etl_helpers/mediawiki_etl.py index 4811850a..d1d2f7f8 100644 --- a/dags/hivemind_etl_helpers/mediawiki_etl.py +++ b/dags/hivemind_etl_helpers/mediawiki_etl.py @@ -6,7 +6,8 @@ def process_mediawiki_etl( community_id: str, - page_titles: list[str] | None = None, + api_url: str, + page_titles: list[str], ) -> None: """ Process the MediaWiki pages or categories @@ -23,11 +24,9 @@ def process_mediawiki_etl( Note: The `page_titles` should be given. """ if page_titles is None: - raise ValueError( - "The `page_titles` must be given!" - ) + raise ValueError("The `page_titles` must be given!") try: - mediawiki_extractor = MediaWikiExtractor() + mediawiki_extractor = MediaWikiExtractor(api_url) documents = mediawiki_extractor.extract( page_titles=page_titles, ) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py index b61e4e44..705cae32 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py @@ -1,10 +1,6 @@ -import os from typing import List, Optional - -from dotenv import load_dotenv -from llama_index.core import Document - from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader +from llama_index.core import Document class MediaWikiExtractor: @@ -17,12 +13,7 @@ def __init__(self, api_url: Optional[str] = "https://en.wikipedia.org/w/api.php" api_url (Optional[str]): The Wikimedia API URL. If None, the URL is loaded from the 'WIKIMEDIA_API_URL' environment variable. """ - if api_url is None: - load_dotenv() - self.api_url = os.getenv("WIKIMEDIA_API_URL") - else: - self.api_url = api_url - + self.api_url = api_url self.wikimedia_reader = MediaWikiReader(api_url=self.api_url) def extract(self, pages: Optional[List[str]] = None) -> List[Document]: @@ -55,12 +46,3 @@ def extract_from_pages(self, pages: List[str]) -> List[Document]: except Exception as e: print(f"Failed to extract from pages {pages}: {str(e)}") return [] - - -# # Example usage -# if __name__ == "__main__": -# # Example of initializing and using the extractor -# extractor = MediaWikiExtractor(api_url="https://en.wikipedia.org/w/api.php") -# documents = extractor.extract(pages=["Python (programming language)", "OpenAI"]) -# for doc in documents: -# print(doc) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py index d4a054a8..9de7121e 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py @@ -4,6 +4,7 @@ from llama_index.legacy.readers.base import BasePydanticReader from llama_index.legacy.schema import Document +import wikipedia class MediaWikiReader(BasePydanticReader): @@ -17,13 +18,6 @@ class MediaWikiReader(BasePydanticReader): def __init__(self, api_url: Optional[str] = None) -> None: """Initialize with parameters.""" - try: - import wikipedia # noqa - except ImportError: - raise ImportError( - "`wikipedia` package not found, please run `pip install wikipedia`" - ) - if api_url: wikipedia.set_api_url(api_url) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/test.py b/dags/hivemind_etl_helpers/src/db/mediawiki/test.py deleted file mode 100644 index 6c250c6b..00000000 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/test.py +++ /dev/null @@ -1,22 +0,0 @@ -from typing import List - -from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader - -# Define a function to test WikipediaReader with Wikivoyage -def test_wikipedia_reader_with_wikivoyage(api_url: str, pages: List[str]): - # Initialize the WikipediaReader with the given API URL - reader = MediaWikiReader(api_url=api_url) - - # Load data for the given pages - documents = reader.load_data(pages) - - # Print out the results - for doc in documents: - print(f"Document ID: {doc.id_}") - print(f"Document Text: {doc.text[:500]}...") # Print first 500 characters for brevity - -# Example page title from Wikivoyage to test -pages_to_test = ["Serbia"] - -# Call the test function with the Wikivoyage API URL and the example page -test_wikipedia_reader_with_wikivoyage("https://en.wikivoyage.org/w/api.php", pages_to_test) \ No newline at end of file diff --git a/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py index 91fdb78f..1a6f3b98 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py @@ -20,8 +20,9 @@ def get_learning_platforms( example data output: ``` [{ - "community_id": "6579c364f1120850414e0dc5", - "page_titles": ["Main_Page", "Default_Page"], + "community_id": "6579c364f1120850414e0dc5", + "page_titles": ["Main_Page", "Default_Page"], + "base_url": "some_api_url", }] ``` """ @@ -41,6 +42,7 @@ def get_learning_platforms( { "community_id": str(community), "page_titles": modules_options.get("page_titles", []), + "base_url": modules_options.get("api_url"), } ) diff --git a/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py index c9181c0e..759b1758 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py @@ -1,5 +1,6 @@ from datetime import datetime from unittest import TestCase + from bson import ObjectId from hivemind_etl_helpers.src.utils.modules import ModulesMediaWiki from hivemind_etl_helpers.src.utils.mongo import MongoSingleton @@ -48,6 +49,7 @@ def test_get_single_data(self): "Help:Contents", "Sandbox", ], + "api_url": "http://example.com/api", }, } ] @@ -68,6 +70,7 @@ def test_get_single_data(self): "Sandbox", ], ) + self.assertEqual(result[0]["base_url"], "http://example.com/api") def test_get_mediawiki_communities_data_multiple_platforms(self): """ @@ -91,6 +94,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "Main_Page", "Help:Contents", ], + "api_url": "http://example1.com/api", }, }, { @@ -101,6 +105,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "Sandbox", "Wikipedia:About", ], + "api_url": "http://example2.com/api", }, }, ] @@ -146,6 +151,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "Main_Page", "Help:Contents", ], + "base_url": "http://example1.com/api", }, ) self.assertEqual( @@ -156,5 +162,6 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "Sandbox", "Wikipedia:About", ], + "base_url": "http://example2.com/api", }, ) diff --git a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py index 847e0343..6bbdf415 100644 --- a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py +++ b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py @@ -1,7 +1,8 @@ import unittest from unittest.mock import Mock -from llama_index.core import Document + from dags.hivemind_etl_helpers.src.db.mediawiki.extractor import MediaWikiExtractor +from llama_index.core import Document class TestMediaWikiExtractor(unittest.TestCase): diff --git a/requirements.txt b/requirements.txt index 2d42ac1e..e65e7183 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,5 +24,4 @@ llama-index-storage-docstore-postgres==0.1.3 llama-index-storage-docstore-redis==0.1.2 llama-index-vector-stores-postgres==0.1.7 redis===5.0.4 -git+https://github.com/polux0/Wikipedia -llama-index-readers-wikipedia===0.1.4 \ No newline at end of file +git+https://github.com/polux0/Wikipedia \ No newline at end of file From 68f7765b5a89d98b4ac26391db526e79ba01d897 Mon Sep 17 00:00:00 2001 From: polux0 Date: Tue, 21 May 2024 10:14:23 +0200 Subject: [PATCH 04/11] fix: linting issues; --- dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py | 1 + .../hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py | 2 +- dags/hivemind_mediawiki_etl.py | 4 +++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py index 705cae32..45c264db 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py @@ -1,4 +1,5 @@ from typing import List, Optional + from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader from llama_index.core import Document diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py index 9de7121e..4f505c9e 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py @@ -2,9 +2,9 @@ from typing import Any, List, Optional +import wikipedia from llama_index.legacy.readers.base import BasePydanticReader from llama_index.legacy.schema import Document -import wikipedia class MediaWikiReader(BasePydanticReader): diff --git a/dags/hivemind_mediawiki_etl.py b/dags/hivemind_mediawiki_etl.py index 831b2191..362d4164 100644 --- a/dags/hivemind_mediawiki_etl.py +++ b/dags/hivemind_mediawiki_etl.py @@ -26,11 +26,13 @@ def get_mediawiki_communities() -> list[dict[str, str | list[str]]]: def start_mediawiki_vectorstore(community_info: dict[str, str | list[str]]): community_id = community_info["community_id"] page_titles = community_info["page_titles"] + api_url = community_info["base_url"] logging.info(f"Working on community, {community_id}") process_mediawiki_etl( community_id=community_id, # type: ignore - page_ids=page_titles, # type: ignore + page_titles=page_titles, # type: ignore + api_url=api_url, # type: ignore ) logging.info(f"Community {community_id} Job finished!") From 9329493c13902e36738af5cbe7eecff678af2834 Mon Sep 17 00:00:00 2001 From: polux0 Date: Tue, 21 May 2024 10:39:03 +0200 Subject: [PATCH 05/11] fix: trying to fix `mypy` ( linter ) issues; --- dags/hivemind_etl_helpers/mediawiki_etl.py | 2 +- .../hivemind_etl_helpers/src/db/mediawiki/extractor.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dags/hivemind_etl_helpers/mediawiki_etl.py b/dags/hivemind_etl_helpers/mediawiki_etl.py index d1d2f7f8..55ddbed2 100644 --- a/dags/hivemind_etl_helpers/mediawiki_etl.py +++ b/dags/hivemind_etl_helpers/mediawiki_etl.py @@ -28,7 +28,7 @@ def process_mediawiki_etl( try: mediawiki_extractor = MediaWikiExtractor(api_url) documents = mediawiki_extractor.extract( - page_titles=page_titles, + page_ids=page_titles, ) except TypeError as exp: logging.info(f"No documents retrieved from MediaWiki! exp: {exp}") diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py index 45c264db..aa9b88f8 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py @@ -17,18 +17,18 @@ def __init__(self, api_url: Optional[str] = "https://en.wikipedia.org/w/api.php" self.api_url = api_url self.wikimedia_reader = MediaWikiReader(api_url=self.api_url) - def extract(self, pages: Optional[List[str]] = None) -> List[Document]: + def extract(self, page_ids: Optional[List[str]] = None) -> List[Document]: """ - Extracts documents from Wikimedia pages specified by their titles. + Extracts documents from Wikimedia page_ids (their titles). Args: - pages (Optional[List[str]]): List of page titles to extract documents from. + pages (Optional[List[str]]): List of page_ids to extract documents from. Returns: List[Document]: A list of Document objects extracted from the specified Wikimedia pages. """ - if pages: - return self.extract_from_pages(pages) + if page_ids: + return self.extract_from_pages(page_ids) return [] def extract_from_pages(self, pages: List[str]) -> List[Document]: From 16ac67d53a15481fecb6a2fea08a72b2afb54b26 Mon Sep 17 00:00:00 2001 From: polux0 Date: Tue, 21 May 2024 10:47:45 +0200 Subject: [PATCH 06/11] fix: resolved issues with tests; --- .../tests/unit/test_mediawiki_extractor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py index 6bbdf415..de96ce85 100644 --- a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py +++ b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py @@ -29,7 +29,7 @@ def test_extract_from_valid_pages(self): self.mock_reader.load_data.return_value = mock_response test_pages = ["Python_(programming_language)", "OpenAI"] - documents = self.extractor.extract(pages=test_pages) + documents = self.extractor.extract(page_ids=test_pages) self.assertEqual(len(documents), len(mock_response)) self.mock_reader.load_data.assert_called_once_with(pages=test_pages) @@ -50,7 +50,7 @@ def test_handle_invalid_page_titles(self): invalid_pages = ["Non_existent_page"] self.mock_reader.load_data.return_value = [] - documents = self.extractor.extract(pages=invalid_pages) + documents = self.extractor.extract(page_ids=invalid_pages) self.assertEqual(len(documents), 0) self.mock_reader.load_data.assert_called_with(pages=invalid_pages) @@ -62,6 +62,6 @@ def test_extract_from_valid_pages_with_exception(self): test_pages = ["Python_(programming_language)"] self.mock_reader.load_data.side_effect = Exception("Mocked exception") - documents = self.extractor.extract(pages=test_pages) + documents = self.extractor.extract(page_ids=test_pages) self.assertEqual(len(documents), 0) self.mock_reader.load_data.assert_called_once_with(pages=test_pages) From dc7cb56ccab88f76fd5ab35966e7b388b353c982 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 21 May 2024 12:44:35 +0330 Subject: [PATCH 07/11] fix: usage of custom ingestion pipeline! + On main branch we were using the qdrant db and we updated mediawiki_etl with the latest updates on CustomIngestionPipeline --- dags/hivemind_etl_helpers/mediawiki_etl.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dags/hivemind_etl_helpers/mediawiki_etl.py b/dags/hivemind_etl_helpers/mediawiki_etl.py index 55ddbed2..66dbae58 100644 --- a/dags/hivemind_etl_helpers/mediawiki_etl.py +++ b/dags/hivemind_etl_helpers/mediawiki_etl.py @@ -33,8 +33,9 @@ def process_mediawiki_etl( except TypeError as exp: logging.info(f"No documents retrieved from MediaWiki! exp: {exp}") - table_name = "mediawiki" - ingestion_pipeline = CustomIngestionPipeline(community_id, table_name=table_name) + ingestion_pipeline = CustomIngestionPipeline( + community_id=community_id, collection_name="mediawiki" + ) try: ingestion_pipeline.run_pipeline(docs=documents) except Exception as e: From f6bf458706f64bbc1923ca2d2922328738c0fe38 Mon Sep 17 00:00:00 2001 From: polux0 Date: Tue, 21 May 2024 14:13:20 +0200 Subject: [PATCH 08/11] fix: adding tc-wikipedia-lib==1.4.0 in requierments.txt; --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 044a05f7..2460fc15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,4 +24,4 @@ llama-index-readers-google==0.2.5 llama-index-storage-docstore-redis==0.1.2 llama-index-storage-docstore-mongodb==0.1.3 redis===5.0.4 -git+https://github.com/polux0/Wikipedia \ No newline at end of file +tc-wikipedia-lib==1.4.0 \ No newline at end of file From 83237c65efcda221a7923300302e6f7105959a3f Mon Sep 17 00:00:00 2001 From: polux0 Date: Tue, 21 May 2024 14:43:35 +0200 Subject: [PATCH 09/11] fix: added `tc-wikipedia-lib==1.0.0` to `requierments.txt`; --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2460fc15..7689fa21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,4 +24,4 @@ llama-index-readers-google==0.2.5 llama-index-storage-docstore-redis==0.1.2 llama-index-storage-docstore-mongodb==0.1.3 redis===5.0.4 -tc-wikipedia-lib==1.4.0 \ No newline at end of file +tc-wikipedia-lib==1.0.0 From 09eb552198804f9b75347c8276e92c35e02f4f77 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 21 May 2024 17:34:42 +0330 Subject: [PATCH 10/11] feat: added url in llama-index document metadata! In order to be able to debug easily in future. --- .../src/db/mediawiki/mediawiki_reader.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py index 4f505c9e..1c3598b2 100644 --- a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py @@ -39,5 +39,14 @@ def load_data(self, pages: List[str], **load_kwargs: Any) -> List[Document]: wiki_page = wikipedia.page(page, **load_kwargs) page_content = wiki_page.content page_id = wiki_page.pageid - results.append(Document(id_=page_id, text=page_content)) + doc = Document( + id_=page_id, + text=page_content, + metadata={ + "url": wiki_page.url, + }, + excluded_embed_metadata_keys=["url"], + excluded_llm_metadata_keys=["url"], + ) + results.append(doc) return results From d8f3975f98b0a317ffc8dd57d3ed99065cf340f9 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 21 May 2024 17:59:03 +0330 Subject: [PATCH 11/11] fix: update to the latest db structure! modules and platform structure needed to be updated with latest structure. --- .../src/utils/modules/mediawiki.py | 6 +++- .../src/utils/modules/modules_base.py | 17 +++++---- .../integration/test_mediawiki_modules.py | 35 ++++++++++--------- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py index 1a6f3b98..92c9f6e1 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py @@ -37,11 +37,15 @@ def get_learning_platforms( if platform["name"] != self.platform_name: continue + page_ids = self.get_platform_metadata( + platform_id=platform["platform"], + metadata_name="pageIds", + ) modules_options = platform["metadata"] communities_data.append( { "community_id": str(community), - "page_titles": modules_options.get("page_titles", []), + "page_titles": page_ids, "base_url": modules_options.get("api_url"), } ) diff --git a/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py b/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py index 440deacd..5b2197f6 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py @@ -76,10 +76,11 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str: """ client = MongoSingleton.get_instance().client - user = self._get_platform_userid(platform_id) + user_id = self.get_platform_metadata(platform_id, "userId") + user_id = ObjectId(user_id) token_doc = client["Core"]["tokens"].find_one( { - "user": user, + "user": user_id, "type": token_type, }, { @@ -89,13 +90,13 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str: ) if token_doc is None: raise ValueError( - f"No Token for the given user {user} " + f"No Token for the given user {user_id} " "in tokens collection of the Core database!" ) token = token_doc["token"] return token - def _get_platform_userid(self, platform_id: ObjectId) -> str: + def get_platform_metadata(self, platform_id: ObjectId, metadata_name: str) -> str: """ get the userid that belongs to a platform @@ -103,6 +104,8 @@ def _get_platform_userid(self, platform_id: ObjectId) -> str: ----------- platform_id : bson.ObjectId the platform id we need their owner user id + metadata_name : str + a specific field of metadata that we want Returns --------- @@ -116,11 +119,11 @@ def _get_platform_userid(self, platform_id: ObjectId) -> str: "_id": platform_id, }, { - "metadata.userId": 1, + f"metadata.{metadata_name}": 1, }, ) if platform is None: raise ValueError(f"No platform available given platform id: {platform_id}") - user_id = platform["metadata"]["userId"] - return ObjectId(user_id) + metadata_field = platform["metadata"][metadata_name] + return metadata_field diff --git a/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py index 759b1758..93d247c9 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py @@ -26,7 +26,13 @@ def test_get_single_data(self): { "_id": platform_id, "name": "mediawiki", - "metadata": {}, + "metadata": { + "pageIds": [ + "Main_Page", + "Help:Contents", + "Sandbox", + ], + }, "community": community_id, "disconnectedAt": None, "connectedAt": datetime.now(), @@ -44,11 +50,6 @@ def test_get_single_data(self): "platform": platform_id, "name": "mediawiki", "metadata": { - "page_titles": [ - "Main_Page", - "Help:Contents", - "Sandbox", - ], "api_url": "http://example.com/api", }, } @@ -90,10 +91,6 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "platform": platform_id1, "name": "mediawiki", "metadata": { - "page_titles": [ - "Main_Page", - "Help:Contents", - ], "api_url": "http://example1.com/api", }, }, @@ -101,10 +98,6 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "platform": platform_id2, "name": "mediawiki", "metadata": { - "page_titles": [ - "Sandbox", - "Wikipedia:About", - ], "api_url": "http://example2.com/api", }, }, @@ -117,7 +110,12 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): { "_id": platform_id1, "name": "mediawiki", - "metadata": {}, + "metadata": { + "pageIds": [ + "Main_Page", + "Help:Contents", + ], + }, "community": community_id, "disconnectedAt": None, "connectedAt": datetime.now(), @@ -130,7 +128,12 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): { "_id": platform_id2, "name": "mediawiki", - "metadata": {}, + "metadata": { + "pageIds": [ + "Sandbox", + "Wikipedia:About", + ], + }, "community": community_id, "disconnectedAt": None, "connectedAt": datetime.now(),