Skip to content

Commit

Permalink
Merge pull request #154 from TogetherCrew/feat/mediawiki-ingest-and-v…
Browse files Browse the repository at this point in the history
…ectorize

feat: `WikipediaReader` that accepts `api_url` -> `MediaWikiReader`;
  • Loading branch information
cyri113 authored May 22, 2024
2 parents 5ce6b54 + d8f3975 commit be19f29
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 7 deletions.
42 changes: 42 additions & 0 deletions dags/hivemind_etl_helpers/mediawiki_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

from dags.hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline
from dags.hivemind_etl_helpers.src.db.mediawiki.extractor import MediaWikiExtractor


def process_mediawiki_etl(
community_id: str,
api_url: str,
page_titles: list[str],
) -> None:
"""
Process the MediaWiki pages or categories
and save the processed data within PostgreSQL
Parameters
-----------
community_id : str
the community to save its data
page_titles : list[str] | None
the page titles to process their data
default is None
Note: The `page_titles` should be given.
"""
if page_titles is None:
raise ValueError("The `page_titles` must be given!")
try:
mediawiki_extractor = MediaWikiExtractor(api_url)
documents = mediawiki_extractor.extract(
page_ids=page_titles,
)
except TypeError as exp:
logging.info(f"No documents retrieved from MediaWiki! exp: {exp}")

ingestion_pipeline = CustomIngestionPipeline(
community_id=community_id, collection_name="mediawiki"
)
try:
ingestion_pipeline.run_pipeline(docs=documents)
except Exception as e:
logging.info(f"Error while trying to run MediaWikiIngestionPipeline! exp: {e}")
Empty file.
49 changes: 49 additions & 0 deletions dags/hivemind_etl_helpers/src/db/mediawiki/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List, Optional

from hivemind_etl_helpers.src.db.mediawiki.mediawiki_reader import MediaWikiReader
from llama_index.core import Document


class MediaWikiExtractor:
def __init__(self, api_url: Optional[str] = "https://en.wikipedia.org/w/api.php"):
"""
Initializes the MediaWikiExtractor with an API URL for Wikimedia.
If no URL is provided, it tries to load it from an environment variable.
Args:
api_url (Optional[str]): The Wikimedia API URL.
If None, the URL is loaded from the 'WIKIMEDIA_API_URL' environment variable.
"""
self.api_url = api_url
self.wikimedia_reader = MediaWikiReader(api_url=self.api_url)

def extract(self, page_ids: Optional[List[str]] = None) -> List[Document]:
"""
Extracts documents from Wikimedia page_ids (their titles).
Args:
pages (Optional[List[str]]): List of page_ids to extract documents from.
Returns:
List[Document]: A list of Document objects extracted from the specified Wikimedia pages.
"""
if page_ids:
return self.extract_from_pages(page_ids)
return []

def extract_from_pages(self, pages: List[str]) -> List[Document]:
"""
Extracts documents from specific Wikimedia pages by their titles.
Args:
pages (List[str]): The list of page titles to extract documents from.
Returns:
List[Document]: A list of Document objects extracted from the specified pages.
"""
try:
response = self.wikimedia_reader.load_data(pages=pages)
return response
except Exception as e:
print(f"Failed to extract from pages {pages}: {str(e)}")
return []
52 changes: 52 additions & 0 deletions dags/hivemind_etl_helpers/src/db/mediawiki/mediawiki_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Simple reader that reads mediawiki."""

from typing import Any, List, Optional

import wikipedia
from llama_index.legacy.readers.base import BasePydanticReader
from llama_index.legacy.schema import Document


class MediaWikiReader(BasePydanticReader):
"""WikiMedia reader.
Reads a page.
"""

is_remote: bool = True

def __init__(self, api_url: Optional[str] = None) -> None:
"""Initialize with parameters."""
if api_url:
wikipedia.set_api_url(api_url)

@classmethod
def class_name(cls) -> str:
return "WikipediaReader"

def load_data(self, pages: List[str], **load_kwargs: Any) -> List[Document]:
"""Load data from the input directory.
Args:
pages (List[str]): List of pages to read.
"""
import wikipedia

results = []
for page in pages:
wiki_page = wikipedia.page(page, **load_kwargs)
page_content = wiki_page.content
page_id = wiki_page.pageid
doc = Document(
id_=page_id,
text=page_content,
metadata={
"url": wiki_page.url,
},
excluded_embed_metadata_keys=["url"],
excluded_llm_metadata_keys=["url"],
)
results.append(doc)
return results
1 change: 1 addition & 0 deletions dags/hivemind_etl_helpers/src/utils/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
from .discourse import ModulesDiscourse
from .gdrive import ModulesGDrive
from .github import ModulesGitHub
from .mediawiki import ModulesMediaWiki
from .notion import ModulesNotion
53 changes: 53 additions & 0 deletions dags/hivemind_etl_helpers/src/utils/modules/mediawiki.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from .modules_base import ModulesBase


class ModulesMediaWiki(ModulesBase):
def __init__(self) -> None:
self.platform_name = "mediawiki"
super().__init__()

def get_learning_platforms(
self,
) -> list[dict[str, str | list[str]]]:
"""
Get all the MediaWiki communities with their page titles.
Returns
---------
community_orgs : list[dict[str, str | list[str]]] = []
a list of MediaWiki data information
example data output:
```
[{
"community_id": "6579c364f1120850414e0dc5",
"page_titles": ["Main_Page", "Default_Page"],
"base_url": "some_api_url",
}]
```
"""
modules = self.query(platform=self.platform_name, projection={"name": 0})
communities_data: list[dict[str, str | list[str]]] = []

for module in modules:
community = module["community"]

# each platform of the community
for platform in module["options"]["platforms"]:
if platform["name"] != self.platform_name:
continue

page_ids = self.get_platform_metadata(
platform_id=platform["platform"],
metadata_name="pageIds",
)
modules_options = platform["metadata"]
communities_data.append(
{
"community_id": str(community),
"page_titles": page_ids,
"base_url": modules_options.get("api_url"),
}
)

return communities_data
17 changes: 10 additions & 7 deletions dags/hivemind_etl_helpers/src/utils/modules/modules_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str:
"""
client = MongoSingleton.get_instance().client

user = self._get_platform_userid(platform_id)
user_id = self.get_platform_metadata(platform_id, "userId")
user_id = ObjectId(user_id)
token_doc = client["Core"]["tokens"].find_one(
{
"user": user,
"user": user_id,
"type": token_type,
},
{
Expand All @@ -89,20 +90,22 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str:
)
if token_doc is None:
raise ValueError(
f"No Token for the given user {user} "
f"No Token for the given user {user_id} "
"in tokens collection of the Core database!"
)
token = token_doc["token"]
return token

def _get_platform_userid(self, platform_id: ObjectId) -> str:
def get_platform_metadata(self, platform_id: ObjectId, metadata_name: str) -> str:
"""
get the userid that belongs to a platform
Parameters
-----------
platform_id : bson.ObjectId
the platform id we need their owner user id
metadata_name : str
a specific field of metadata that we want
Returns
---------
Expand All @@ -116,11 +119,11 @@ def _get_platform_userid(self, platform_id: ObjectId) -> str:
"_id": platform_id,
},
{
"metadata.userId": 1,
f"metadata.{metadata_name}": 1,
},
)
if platform is None:
raise ValueError(f"No platform available given platform id: {platform_id}")

user_id = platform["metadata"]["userId"]
return ObjectId(user_id)
metadata_field = platform["metadata"][metadata_name]
return metadata_field
Loading

0 comments on commit be19f29

Please sign in to comment.