diff --git a/dags/hivemind_etl_helpers/mediawiki_etl.py b/dags/hivemind_etl_helpers/mediawiki_etl.py new file mode 100644 index 00000000..66dbae58 --- /dev/null +++ b/dags/hivemind_etl_helpers/mediawiki_etl.py @@ -0,0 +1,42 @@ +import logging + +from dags.hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline +from dags.hivemind_etl_helpers.src.db.mediawiki.extractor import MediaWikiExtractor + + +def process_mediawiki_etl( + community_id: str, + api_url: str, + page_titles: list[str], +) -> None: + """ + Process the MediaWiki pages or categories + and save the processed data within PostgreSQL + + Parameters + ----------- + community_id : str + the community to save its data + page_titles : list[str] | None + the page titles to process their data + default is None + + Note: The `page_titles` should be given. + """ + if page_titles is None: + raise ValueError("The `page_titles` must be given!") + try: + mediawiki_extractor = MediaWikiExtractor(api_url) + documents = mediawiki_extractor.extract( + page_ids=page_titles, + ) + except TypeError as exp: + logging.info(f"No documents retrieved from MediaWiki! exp: {exp}") + + ingestion_pipeline = CustomIngestionPipeline( + community_id=community_id, collection_name="mediawiki" + ) + try: + ingestion_pipeline.run_pipeline(docs=documents) + except Exception as e: + logging.info(f"Error while trying to run MediaWikiIngestionPipeline! exp: {e}") diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/__init__.py b/dags/hivemind_etl_helpers/src/db/mediawiki/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py new file mode 100644 index 00000000..aa9b88f8 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py @@ -0,0 +1,49 @@ +from typing import List, Optional + +from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader +from llama_index.core import Document + + +class MediaWikiExtractor: + def __init__(self, api_url: Optional[str] = "https://en.wikipedia.org/w/api.php"): + """ + Initializes the MediaWikiExtractor with an API URL for Wikimedia. + If no URL is provided, it tries to load it from an environment variable. + + Args: + api_url (Optional[str]): The Wikimedia API URL. + If None, the URL is loaded from the 'WIKIMEDIA_API_URL' environment variable. + """ + self.api_url = api_url + self.wikimedia_reader = MediaWikiReader(api_url=self.api_url) + + def extract(self, page_ids: Optional[List[str]] = None) -> List[Document]: + """ + Extracts documents from Wikimedia page_ids (their titles). + + Args: + pages (Optional[List[str]]): List of page_ids to extract documents from. + + Returns: + List[Document]: A list of Document objects extracted from the specified Wikimedia pages. + """ + if page_ids: + return self.extract_from_pages(page_ids) + return [] + + def extract_from_pages(self, pages: List[str]) -> List[Document]: + """ + Extracts documents from specific Wikimedia pages by their titles. + + Args: + pages (List[str]): The list of page titles to extract documents from. + + Returns: + List[Document]: A list of Document objects extracted from the specified pages. + """ + try: + response = self.wikimedia_reader.load_data(pages=pages) + return response + except Exception as e: + print(f"Failed to extract from pages {pages}: {str(e)}") + return [] diff --git a/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py new file mode 100644 index 00000000..1c3598b2 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py @@ -0,0 +1,52 @@ +"""Simple reader that reads mediawiki.""" + +from typing import Any, List, Optional + +import wikipedia +from llama_index.legacy.readers.base import BasePydanticReader +from llama_index.legacy.schema import Document + + +class MediaWikiReader(BasePydanticReader): + """WikiMedia reader. + + Reads a page. + + """ + + is_remote: bool = True + + def __init__(self, api_url: Optional[str] = None) -> None: + """Initialize with parameters.""" + if api_url: + wikipedia.set_api_url(api_url) + + @classmethod + def class_name(cls) -> str: + return "WikipediaReader" + + def load_data(self, pages: List[str], **load_kwargs: Any) -> List[Document]: + """Load data from the input directory. + + Args: + pages (List[str]): List of pages to read. + + """ + import wikipedia + + results = [] + for page in pages: + wiki_page = wikipedia.page(page, **load_kwargs) + page_content = wiki_page.content + page_id = wiki_page.pageid + doc = Document( + id_=page_id, + text=page_content, + metadata={ + "url": wiki_page.url, + }, + excluded_embed_metadata_keys=["url"], + excluded_llm_metadata_keys=["url"], + ) + results.append(doc) + return results diff --git a/dags/hivemind_etl_helpers/src/utils/modules/__init__.py b/dags/hivemind_etl_helpers/src/utils/modules/__init__.py index 41de934a..89c563a4 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/__init__.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/__init__.py @@ -3,4 +3,5 @@ from .discourse import ModulesDiscourse from .gdrive import ModulesGDrive from .github import ModulesGitHub +from .mediawiki import ModulesMediaWiki from .notion import ModulesNotion diff --git a/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py new file mode 100644 index 00000000..92c9f6e1 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py @@ -0,0 +1,53 @@ +from .modules_base import ModulesBase + + +class ModulesMediaWiki(ModulesBase): + def __init__(self) -> None: + self.platform_name = "mediawiki" + super().__init__() + + def get_learning_platforms( + self, + ) -> list[dict[str, str | list[str]]]: + """ + Get all the MediaWiki communities with their page titles. + + Returns + --------- + community_orgs : list[dict[str, str | list[str]]] = [] + a list of MediaWiki data information + + example data output: + ``` + [{ + "community_id": "6579c364f1120850414e0dc5", + "page_titles": ["Main_Page", "Default_Page"], + "base_url": "some_api_url", + }] + ``` + """ + modules = self.query(platform=self.platform_name, projection={"name": 0}) + communities_data: list[dict[str, str | list[str]]] = [] + + for module in modules: + community = module["community"] + + # each platform of the community + for platform in module["options"]["platforms"]: + if platform["name"] != self.platform_name: + continue + + page_ids = self.get_platform_metadata( + platform_id=platform["platform"], + metadata_name="pageIds", + ) + modules_options = platform["metadata"] + communities_data.append( + { + "community_id": str(community), + "page_titles": page_ids, + "base_url": modules_options.get("api_url"), + } + ) + + return communities_data diff --git a/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py b/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py index 440deacd..5b2197f6 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py @@ -76,10 +76,11 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str: """ client = MongoSingleton.get_instance().client - user = self._get_platform_userid(platform_id) + user_id = self.get_platform_metadata(platform_id, "userId") + user_id = ObjectId(user_id) token_doc = client["Core"]["tokens"].find_one( { - "user": user, + "user": user_id, "type": token_type, }, { @@ -89,13 +90,13 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str: ) if token_doc is None: raise ValueError( - f"No Token for the given user {user} " + f"No Token for the given user {user_id} " "in tokens collection of the Core database!" ) token = token_doc["token"] return token - def _get_platform_userid(self, platform_id: ObjectId) -> str: + def get_platform_metadata(self, platform_id: ObjectId, metadata_name: str) -> str: """ get the userid that belongs to a platform @@ -103,6 +104,8 @@ def _get_platform_userid(self, platform_id: ObjectId) -> str: ----------- platform_id : bson.ObjectId the platform id we need their owner user id + metadata_name : str + a specific field of metadata that we want Returns --------- @@ -116,11 +119,11 @@ def _get_platform_userid(self, platform_id: ObjectId) -> str: "_id": platform_id, }, { - "metadata.userId": 1, + f"metadata.{metadata_name}": 1, }, ) if platform is None: raise ValueError(f"No platform available given platform id: {platform_id}") - user_id = platform["metadata"]["userId"] - return ObjectId(user_id) + metadata_field = platform["metadata"][metadata_name] + return metadata_field diff --git a/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py new file mode 100644 index 00000000..93d247c9 --- /dev/null +++ b/dags/hivemind_etl_helpers/tests/integration/test_mediawiki_modules.py @@ -0,0 +1,170 @@ +from datetime import datetime +from unittest import TestCase + +from bson import ObjectId +from hivemind_etl_helpers.src.utils.modules import ModulesMediaWiki +from hivemind_etl_helpers.src.utils.mongo import MongoSingleton + + +class TestGetMediaWikiModules(TestCase): + def setUp(self): + client = MongoSingleton.get_instance().client + client["Core"].drop_collection("modules") + client["Core"].drop_collection("platforms") + self.client = client + self.modules_mediawiki = ModulesMediaWiki() + + def test_get_empty_data(self): + result = self.modules_mediawiki.get_learning_platforms() + self.assertEqual(result, []) + + def test_get_single_data(self): + platform_id = ObjectId("6579c364f1120850414e0dc6") + community_id = ObjectId("6579c364f1120850414e0dc5") + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id, + "name": "mediawiki", + "metadata": { + "pageIds": [ + "Main_Page", + "Help:Contents", + "Sandbox", + ], + }, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + self.client["Core"]["modules"].insert_one( + { + "name": "hivemind", + "community": community_id, + "options": { + "platforms": [ + { + "platform": platform_id, + "name": "mediawiki", + "metadata": { + "api_url": "http://example.com/api", + }, + } + ] + }, + } + ) + + result = self.modules_mediawiki.get_learning_platforms() + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["community_id"], "6579c364f1120850414e0dc5") + self.assertEqual( + result[0]["page_titles"], + [ + "Main_Page", + "Help:Contents", + "Sandbox", + ], + ) + self.assertEqual(result[0]["base_url"], "http://example.com/api") + + def test_get_mediawiki_communities_data_multiple_platforms(self): + """ + Two mediawiki platforms for one community + """ + platform_id1 = ObjectId("6579c364f1120850414e0dc6") + platform_id2 = ObjectId("6579c364f1120850414e0dc7") + community_id = ObjectId("1009c364f1120850414e0dc5") + + self.client["Core"]["modules"].insert_one( + { + "name": "hivemind", + "community": community_id, + "options": { + "platforms": [ + { + "platform": platform_id1, + "name": "mediawiki", + "metadata": { + "api_url": "http://example1.com/api", + }, + }, + { + "platform": platform_id2, + "name": "mediawiki", + "metadata": { + "api_url": "http://example2.com/api", + }, + }, + ] + }, + } + ) + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id1, + "name": "mediawiki", + "metadata": { + "pageIds": [ + "Main_Page", + "Help:Contents", + ], + }, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + self.client["Core"]["platforms"].insert_one( + { + "_id": platform_id2, + "name": "mediawiki", + "metadata": { + "pageIds": [ + "Sandbox", + "Wikipedia:About", + ], + }, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + result = self.modules_mediawiki.get_learning_platforms() + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 2) + self.assertEqual( + result[0], + { + "community_id": str(community_id), + "page_titles": [ + "Main_Page", + "Help:Contents", + ], + "base_url": "http://example1.com/api", + }, + ) + self.assertEqual( + result[1], + { + "community_id": str(community_id), + "page_titles": [ + "Sandbox", + "Wikipedia:About", + ], + "base_url": "http://example2.com/api", + }, + ) diff --git a/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py new file mode 100644 index 00000000..de96ce85 --- /dev/null +++ b/dags/hivemind_etl_helpers/tests/unit/test_mediawiki_extractor.py @@ -0,0 +1,67 @@ +import unittest +from unittest.mock import Mock + +from dags.hivemind_etl_helpers.src.db.mediawiki.extractor import MediaWikiExtractor +from llama_index.core import Document + + +class TestMediaWikiExtractor(unittest.TestCase): + def setUp(self): + """ + Setup for each test case with a direct Mock of MediaWikiReader. + """ + self.api_url = "https://en.wikipedia.org/w/api.php" + self.extractor = MediaWikiExtractor(api_url=self.api_url) + self.mock_reader = Mock(spec=self.extractor.wikimedia_reader) + self.extractor.wikimedia_reader = self.mock_reader + + def test_initialization_with_api_url(self): + """ + Test that the extractor is initialized with the correct API URL. + """ + self.assertEqual(self.extractor.api_url, self.api_url) + + def test_extract_from_valid_pages(self): + """ + Test extracting from valid pages. + """ + mock_response = [Mock(spec=Document), Mock(spec=Document)] + self.mock_reader.load_data.return_value = mock_response + + test_pages = ["Python_(programming_language)", "OpenAI"] + documents = self.extractor.extract(page_ids=test_pages) + self.assertEqual(len(documents), len(mock_response)) + self.mock_reader.load_data.assert_called_once_with(pages=test_pages) + + def test_extract_no_pages(self): + """ + Test extracting with no pages provided. + Expecting empty results. + """ + documents = self.extractor.extract() + self.assertEqual(len(documents), 0) + self.mock_reader.load_data.assert_not_called() + + def test_handle_invalid_page_titles(self): + """ + Test handling of invalid page titles. + Expecting empty results. + """ + invalid_pages = ["Non_existent_page"] + self.mock_reader.load_data.return_value = [] + + documents = self.extractor.extract(page_ids=invalid_pages) + self.assertEqual(len(documents), 0) + self.mock_reader.load_data.assert_called_with(pages=invalid_pages) + + def test_extract_from_valid_pages_with_exception(self): + """ + Test extracting from valid pages with an exception occurring. + Expecting empty results. + """ + test_pages = ["Python_(programming_language)"] + self.mock_reader.load_data.side_effect = Exception("Mocked exception") + + documents = self.extractor.extract(page_ids=test_pages) + self.assertEqual(len(documents), 0) + self.mock_reader.load_data.assert_called_once_with(pages=test_pages) diff --git a/dags/hivemind_mediawiki_etl.py b/dags/hivemind_mediawiki_etl.py new file mode 100644 index 00000000..362d4164 --- /dev/null +++ b/dags/hivemind_mediawiki_etl.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import logging +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task +from dags.hivemind_etl_helpers.mediawiki_etl import process_mediawiki_etl +from hivemind_etl_helpers.src.utils.modules import ModulesMediaWiki + +with DAG( + dag_id="mediawiki_vector_store_update", + start_date=datetime(2024, 2, 21), + schedule_interval="0 4 * * *", +) as dag: + + @task + def get_mediawiki_communities() -> list[dict[str, str | list[str]]]: + """ + Getting all communities having mediawiki from database + """ + communities = ModulesMediaWiki().get_learning_platforms() + return communities + + @task + def start_mediawiki_vectorstore(community_info: dict[str, str | list[str]]): + community_id = community_info["community_id"] + page_titles = community_info["page_titles"] + api_url = community_info["base_url"] + + logging.info(f"Working on community, {community_id}") + process_mediawiki_etl( + community_id=community_id, # type: ignore + page_titles=page_titles, # type: ignore + api_url=api_url, # type: ignore + ) + logging.info(f"Community {community_id} Job finished!") + + communities_info = get_mediawiki_communities() + start_mediawiki_vectorstore.expand(community_info=communities_info) diff --git a/requirements.txt b/requirements.txt index 42fe1d9e..7689fa21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,4 @@ llama-index-readers-google==0.2.5 llama-index-storage-docstore-redis==0.1.2 llama-index-storage-docstore-mongodb==0.1.3 redis===5.0.4 +tc-wikipedia-lib==1.0.0