From 3c0e8d951de2174b70e0501fc83127c13e5ff520 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 16 Oct 2024 15:21:09 +0330 Subject: [PATCH 01/13] wip: crawler is just completed! --- .../src/db/website/__init__.py | 0 .../src/db/website/crawlee_client.py | 61 ++++++++++++ .../src/utils/modules/__init__.py | 1 + .../src/utils/modules/modules_base.py | 6 +- .../src/utils/modules/website.py | 63 ++++++++++++ dags/hivemind_etl_helpers/website_etl.py | 44 +++++++++ dags/hivemind_website_ingestion.py | 98 +++++++++++++++++++ requirements.txt | 1 + 8 files changed, 271 insertions(+), 3 deletions(-) create mode 100644 dags/hivemind_etl_helpers/src/db/website/__init__.py create mode 100644 dags/hivemind_etl_helpers/src/db/website/crawlee_client.py create mode 100644 dags/hivemind_etl_helpers/src/utils/modules/website.py create mode 100644 dags/hivemind_etl_helpers/website_etl.py create mode 100644 dags/hivemind_website_ingestion.py diff --git a/dags/hivemind_etl_helpers/src/db/website/__init__.py b/dags/hivemind_etl_helpers/src/db/website/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py new file mode 100644 index 00000000..ead2f709 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py @@ -0,0 +1,61 @@ +from typing import Any + +from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext +import xml.etree.ElementTree as ET + + +class CrawleeClient: + def __init__(self) -> None: + self.crawler = PlaywrightCrawler( + max_requests_per_crawl=20, + headless=False, + browser_type='chromium', + ) + @self.crawler.router.default_handler + async def request_handler(context: PlaywrightCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url} ...') + + inner_text = await context.page.inner_text(selector="body") + + if "sitemap.xml" in context.request.url: + links = self._extract_links_from_sitemap(inner_text) + await context.add_requests(requests=list(set(links))) + else: + await context.enqueue_links() + + data = { + 'url': context.request.url, + 'title': await context.page.title(), + 'inner_text': inner_text, + } + + await context.push_data(data) + + def _extract_links_from_sitemap(self, sitemap_content: str): + """ + extract link from sitemaps + """ + root = ET.fromstring(sitemap_content) + namespace = {"ns": "http://www.sitemaps.org/schemas/sitemap/0.9"} + links = [element.text.strip() for element in root.findall("ns:url/ns:loc", namespace)] + + return links + + async def crawl(self, links: list[str]) -> list[dict[str, Any]]: + """ + crawl a website and all inner links under the domain routes + + Parameters + ------------ + links : list[str] + the website link or links to crawl + + Returns + -------- + crawled_data : list[dict[str, Any]] + the data we've crawled from a website + """ + await self.crawler.add_requests(requests=links) + await self.crawler.run() + crawled_data = await self.crawler.get_data() + return crawled_data diff --git a/dags/hivemind_etl_helpers/src/utils/modules/__init__.py b/dags/hivemind_etl_helpers/src/utils/modules/__init__.py index 89c563a4..98b5e5a4 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/__init__.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/__init__.py @@ -5,3 +5,4 @@ from .github import ModulesGitHub from .mediawiki import ModulesMediaWiki from .notion import ModulesNotion +from .website import ModulesWebsite diff --git a/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py b/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py index 7f348e55..586bc1ac 100644 --- a/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py +++ b/dags/hivemind_etl_helpers/src/utils/modules/modules_base.py @@ -98,7 +98,7 @@ def get_token(self, platform_id: ObjectId, token_type: str) -> str: def get_platform_metadata( self, platform_id: ObjectId, metadata_name: str - ) -> str | dict: + ) -> str | dict | list: """ get the userid that belongs to a platform @@ -111,8 +111,8 @@ def get_platform_metadata( Returns --------- - user_id : str - the user id that the platform belongs to + metadata_value : Any + the values that the metadata belongs to """ client = MongoSingleton.get_instance().get_client() diff --git a/dags/hivemind_etl_helpers/src/utils/modules/website.py b/dags/hivemind_etl_helpers/src/utils/modules/website.py new file mode 100644 index 00000000..8b473570 --- /dev/null +++ b/dags/hivemind_etl_helpers/src/utils/modules/website.py @@ -0,0 +1,63 @@ +import logging + +from .modules_base import ModulesBase + + +class ModulesWebsite(ModulesBase): + def __init__(self) -> None: + self.platform_name = "website" + super().__init__() + + def get_learning_platforms( + self, + ) -> list[dict[str, str | list[str]]]: + """ + Get all the website communities with their page titles. + + Returns + --------- + community_orgs : list[dict[str, str | list[str]]] = [] + a list of website data information + + example data output: + ``` + [{ + "community_id": "6579c364f1120850414e0dc5", + "platform_id": "6579c364f1120850414e0dc6", + "urls": ["link1", "link2"], + }] + ``` + """ + modules = self.query(platform=self.platform_name, projection={"name": 0}) + communities_data: list[dict[str, str | list[str]]] = [] + + for module in modules: + community = module["community"] + + # each platform of the community + for platform in module["options"]["platforms"]: + if platform["name"] != self.platform_name: + continue + + platform_id = platform["platform"] + + try: + website_links = self.get_platform_metadata( + platform_id=platform_id, + metadata_name="resources", + ) + + communities_data.append( + { + "community_id": str(community), + "platform_id": platform_id, + "urls": website_links, + } + ) + except Exception as exp: + logging.error( + "Exception while fetching website modules " + f"for platform: {platform_id} | exception: {exp}" + ) + + return communities_data diff --git a/dags/hivemind_etl_helpers/website_etl.py b/dags/hivemind_etl_helpers/website_etl.py new file mode 100644 index 00000000..355432ee --- /dev/null +++ b/dags/hivemind_etl_helpers/website_etl.py @@ -0,0 +1,44 @@ +from hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline +from llama_index.core import Document +from hivemind_etl_helpers.src.db.website.crawlee_client import CrawleeClient + + +class WebsiteETL: + def __init__( + self, + community_id: str, + ) -> None: + """ + Parameters + ----------- + community_id : str + the community to save its data + access_token : str | None + notion ingegration access token + """ + self.community_id = community_id + collection_name = "website" + + # preparing the data extractor and ingestion pipelines + self.crawlee_client = CrawleeClient() + self.ingestion_pipeline = CustomIngestionPipeline( + self.community_id, collection_name=collection_name + ) + + def extract( + self, urls: list[str], + ) -> list: + """ + extract data + """ + extracted_data = self.crawlee_client.crawl(urls) + + return extracted_data + + def transform(self, raw_data: list) -> list[Document]: + # transforming + pass + + def load(self, documents: list[Document]) -> None: + # loading data into db + self.ingestion_pipeline.run_pipeline(docs=documents) diff --git a/dags/hivemind_website_ingestion.py b/dags/hivemind_website_ingestion.py new file mode 100644 index 00000000..99596046 --- /dev/null +++ b/dags/hivemind_website_ingestion.py @@ -0,0 +1,98 @@ +# TODO: NOT COMPLETED AND MISSING CODES + +import logging +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task +from dotenv import load_dotenv +from hivemind_etl_helpers.discord_mongo_summary_etl import process_discord_summaries +from hivemind_etl_helpers.discord_mongo_vector_store_etl import ( + process_discord_guild_mongo, +) +from hivemind_etl_helpers.src.utils.modules import ModulesWebsite + +with DAG( + dag_id="website_ingestion_embedding", + start_date=datetime(2024, 1, 1), + schedule_interval="0 2 * * *", + catchup=False, + max_active_runs=1, +) as dag: + + @task + def get_website_communities() -> list[dict[str, str | datetime | list]]: + """ + Getting all communities having discord from database + """ + communities = ModulesWebsite().get_learning_platforms() + return communities + + @task + def start_website_embedding(community_info: dict[str, str | datetime | list]): + load_dotenv() + community_id = community_info["community_id"] + platform_id = community_info["platform_id"] + links = community_info["urls"] + + logging.info( + f"Processing community_id: {community_id} | platform_id: {platform_id}" + ) + process_discord_guild_mongo( + community_id=community_id, + platform_id=platform_id, + selected_channels=selected_channels, + default_from_date=from_date, + ) + logging.info( + f"Community {community_id} Job finished | platform_id: {platform_id}" + ) + + communities_info = get_website_communities() + start_discord_vectorstore.expand(community_info=communities_info) + + +with DAG( + dag_id="discord_summary_vector_store", + start_date=datetime(2024, 1, 1), + schedule_interval="0 2 * * *", + catchup=False, + max_active_runs=1, +) as dag: + + @task + def get_mongo_discord_communities() -> list[dict[str, str | datetime | list]]: + """ + Getting all communities having discord from database + this function is the same with `get_discord_communities` + we just changed the name for the pylint + """ + communities = ModulesDiscord().get_learning_platforms() + return communities + + @task + def start_discord_summary_vectorstore( + community_info: dict[str, str | datetime | list] + ): + load_dotenv() + + community_id = community_info["community_id"] + platform_id = community_info["platform_id"] + selected_channels = community_info["selected_channels"] + from_date = community_info["from_date"] + logging.info( + f"Working on community, {community_id}| platform_id: {platform_id}" + ) + process_discord_summaries( + community_id=community_id, + platform_id=platform_id, + selected_channels=selected_channels, + default_from_date=from_date, + verbose=False, + ) + logging.info( + f"Community {community_id} Job finished | platform_id: {platform_id}" + ) + + communities = get_mongo_discord_communities() + start_discord_summary_vectorstore.expand(community_info=communities) diff --git a/requirements.txt b/requirements.txt index 2312c2b2..0bc3010e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,3 +22,4 @@ tc-wikipedia-lib==1.0.1 llama-index-readers-file==0.1.22 docx2txt==0.8 tc-analyzer-lib==1.4.11 +crawlee[playwright]==0.3.8 \ No newline at end of file From 87e6f95830446250c76debee81d6af19cc643c60 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Sun, 17 Nov 2024 14:45:20 +0330 Subject: [PATCH 02/13] feat: Completed data ingestion! wip. --- dags/hivemind_etl_helpers/website_etl.py | 89 ++++++++++++++++++++---- 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/dags/hivemind_etl_helpers/website_etl.py b/dags/hivemind_etl_helpers/website_etl.py index 355432ee..2aa18921 100644 --- a/dags/hivemind_etl_helpers/website_etl.py +++ b/dags/hivemind_etl_helpers/website_etl.py @@ -1,3 +1,6 @@ +from typing import Any + +import hashlib from hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline from llama_index.core import Document from hivemind_etl_helpers.src.db.website.crawlee_client import CrawleeClient @@ -25,20 +28,82 @@ def __init__( self.community_id, collection_name=collection_name ) - def extract( + async def extract( self, urls: list[str], - ) -> list: - """ - extract data - """ - extracted_data = self.crawlee_client.crawl(urls) + ) -> list[dict[str, Any]]: + """ + Extract given urls + + Parameters + ----------- + urls : list[str] + a list of urls + + Returns + --------- + extracted_data : list[dict[str, Any]] + The crawled data from urls + """ + extracted_data = await self.crawlee_client.crawl(urls) - return extracted_data + return extracted_data - def transform(self, raw_data: list) -> list[Document]: - # transforming - pass + def transform(self, raw_data: list[dict[str, Any]]) -> list[Document]: + """ + transform raw data to llama-index documents + + Parameters + ------------ + raw_data : list[dict[str, Any]] + crawled data + + Returns + --------- + documents : list[llama_index.Document] + list of llama-index documents + """ + documents: list[Document] = [] + + for data in raw_data: + doc_id = data["url"] + doc = Document( + doc_id=doc_id, + text=data["inner_text"], + metadata={ + "title": data["title"], + "url": data["url"], + } + ) + documents.append(doc) + + return documents def load(self, documents: list[Document]) -> None: - # loading data into db - self.ingestion_pipeline.run_pipeline(docs=documents) + """ + load the documents into the vector db + + Parameters + ------------- + documents: list[llama_index.Document] + the llama-index documents to be ingested + """ + # loading data into db + self.ingestion_pipeline.run_pipeline(docs=documents) + + + def _prepare_id(data: str) -> str: + """ + hash the given data to prepare an id for the document + + Parameters + ----------- + data : str + the string data to be hashed + + Returns + -------- + hashed_data : str + data hashed using sha256 algorithm + """ + hashed_data = hashlib.sha256(data.encode('utf-8')) + return hashed_data.hexdigest() From 0c314d9e9cfb7adbde3008845d85a1439c1ea583 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Sun, 17 Nov 2024 16:23:54 +0330 Subject: [PATCH 03/13] feat: bump analyzer lib version! to resolve a dependency conflict for python-dateutil --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 0bc3010e..a711120b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,5 +21,5 @@ redis===5.0.4 tc-wikipedia-lib==1.0.1 llama-index-readers-file==0.1.22 docx2txt==0.8 -tc-analyzer-lib==1.4.11 +tc-analyzer-lib==1.4.13 crawlee[playwright]==0.3.8 \ No newline at end of file From 4e9ef3615512b1f38a74ce99a9b82f31c4b4227b Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Sun, 17 Nov 2024 16:32:40 +0330 Subject: [PATCH 04/13] just to re-run the CI! From 1b90754eab0ceba43ef1db6fea52d4fe5734c45c Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 18 Nov 2024 11:53:51 +0330 Subject: [PATCH 05/13] feat: code cleaning + codeRabbitAI suggestions! --- .../src/db/website/crawlee_client.py | 109 +++++++++++++----- .../tests/unit/test_website_etl.py | 92 +++++++++++++++ dags/hivemind_etl_helpers/website_etl.py | 12 +- 3 files changed, 179 insertions(+), 34 deletions(-) create mode 100644 dags/hivemind_etl_helpers/tests/unit/test_website_etl.py diff --git a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py index ead2f709..829f0093 100644 --- a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py +++ b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py @@ -1,20 +1,30 @@ from typing import Any - +import asyncio from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext -import xml.etree.ElementTree as ET +from defusedxml import ElementTree as ET class CrawleeClient: - def __init__(self) -> None: + def __init__( + self, + max_requests: int = 20, + headless: bool = True, + browser_type: str = "chromium", + ) -> None: self.crawler = PlaywrightCrawler( - max_requests_per_crawl=20, - headless=False, - browser_type='chromium', + max_requests_per_crawl=max_requests, + headless=headless, + browser_type=browser_type, ) + + # do not persist crawled data to local storage + self.crawler._configuration.persist_storage = False + self.crawler._configuration.write_metadata = False + @self.crawler.router.default_handler async def request_handler(context: PlaywrightCrawlingContext) -> None: - context.log.info(f'Processing {context.request.url} ...') - + context.log.info(f"Processing {context.request.url} ...") + inner_text = await context.page.inner_text(selector="body") if "sitemap.xml" in context.request.url: @@ -24,38 +34,83 @@ async def request_handler(context: PlaywrightCrawlingContext) -> None: await context.enqueue_links() data = { - 'url': context.request.url, - 'title': await context.page.title(), - 'inner_text': inner_text, + "url": context.request.url, + "title": await context.page.title(), + "inner_text": inner_text, } await context.push_data(data) - def _extract_links_from_sitemap(self, sitemap_content: str): + def _extract_links_from_sitemap(self, sitemap_content: str) -> list[str]: """ - extract link from sitemaps + + Extract URLs from a sitemap XML content. + + Parameters + ---------- + sitemap_content : str + The XML content of the sitemap + + Raises + ------ + ET.ParseError + If the XML content is malformed + + Returns + ------- + links : list[str] + list of valid URLs extracted from the sitemap """ - root = ET.fromstring(sitemap_content) - namespace = {"ns": "http://www.sitemaps.org/schemas/sitemap/0.9"} - links = [element.text.strip() for element in root.findall("ns:url/ns:loc", namespace)] - + try: + root = ET.fromstring(sitemap_content) + namespace = {"ns": "http://www.sitemaps.org/schemas/sitemap/0.9"} + links = [] + for element in root.findall("ns:url/ns:loc", namespace): + url = element.text.strip() if element.text else None + if url and url.startswith(("http://", "https://")): + links.append(url) + return links + except ET.ParseError as e: + raise ValueError(f"Invalid sitemap XML: {str(e)}") + return links - + async def crawl(self, links: list[str]) -> list[dict[str, Any]]: """ - crawl a website and all inner links under the domain routes + Crawl websites and extract data from all inner links under the domain routes. Parameters - ------------ + ---------- links : list[str] - the website link or links to crawl + List of valid URLs to crawl Returns - -------- + ------- crawled_data : list[dict[str, Any]] - the data we've crawled from a website + List of dictionaries containing crawled data with keys: + - url: str + - title: str + - inner_text: str + + Raises + ------ + ValueError + If any of the input URLs is invalid (not starting with http or https) + TimeoutError + If the crawl operation times out """ - await self.crawler.add_requests(requests=links) - await self.crawler.run() - crawled_data = await self.crawler.get_data() - return crawled_data + # Validate input URLs + valid_links = [] + for url in links: + if url and isinstance(url, str) and url.startswith(("http://", "https://")): + valid_links.append(url) + else: + raise ValueError(f"Invalid URL: {url}") + + try: + await self.crawler.add_requests(requests=valid_links) + await asyncio.wait_for(self.crawler.run(), timeout=3600) # 1 hour timeout + crawled_data = await self.crawler.get_data() + return crawled_data.items + except asyncio.TimeoutError: + raise TimeoutError("Crawl operation timed out") diff --git a/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py b/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py new file mode 100644 index 00000000..49f08c30 --- /dev/null +++ b/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py @@ -0,0 +1,92 @@ +from unittest import IsolatedAsyncioTestCase +from dotenv import load_dotenv +from unittest.mock import AsyncMock, MagicMock +from llama_index.core import Document +from hivemind_etl_helpers.website_etl import WebsiteETL +import hashlib + + +class TestWebsiteETL(IsolatedAsyncioTestCase): + def setUp(self): + """ + Setup for the test cases. Initializes a WebsiteETL instance with mocked dependencies. + """ + load_dotenv() + self.community_id = "test_community" + self.website_etl = WebsiteETL(self.community_id) + self.website_etl.crawlee_client = AsyncMock() + self.website_etl.ingestion_pipeline = MagicMock() + + async def test_extract(self): + """ + Test the extract method. + """ + urls = ["https://example.com"] + mocked_data = [ + { + "url": "https://example.com", + "inner_text": "Example text", + "title": "Example", + } + ] + self.website_etl.crawlee_client.crawl.return_value = mocked_data + + extracted_data = await self.website_etl.extract(urls) + + self.assertEqual(extracted_data, mocked_data) + self.website_etl.crawlee_client.crawl.assert_awaited_once_with(urls) + + def test_transform(self): + """ + Test the transform method. + """ + raw_data = [ + { + "url": "https://example.com", + "inner_text": "Example text", + "title": "Example", + } + ] + expected_documents = [ + Document( + doc_id="https://example.com", + text="Example text", + metadata={"title": "Example", "url": "https://example.com"}, + ) + ] + + documents = self.website_etl.transform(raw_data) + + self.assertEqual(len(documents), len(expected_documents)) + self.assertEqual(documents[0].doc_id, expected_documents[0].doc_id) + self.assertEqual(documents[0].text, expected_documents[0].text) + self.assertEqual(documents[0].metadata, expected_documents[0].metadata) + + def test_load(self): + """ + Test the load method. + """ + documents = [ + Document( + doc_id="https://example.com", + text="Example text", + metadata={"title": "Example", "url": "https://example.com"}, + ) + ] + + self.website_etl.load(documents) + + self.website_etl.ingestion_pipeline.run_pipeline.assert_called_once_with( + docs=documents + ) + + def test_prepare_id(self): + """ + Test the _prepare_id method. + """ + data = "example_data" + expected_hash = hashlib.sha256(data.encode("utf-8")).hexdigest() + + hashed_data = WebsiteETL._prepare_id(data) + + self.assertEqual(hashed_data, expected_hash) diff --git a/dags/hivemind_etl_helpers/website_etl.py b/dags/hivemind_etl_helpers/website_etl.py index 2aa18921..83551678 100644 --- a/dags/hivemind_etl_helpers/website_etl.py +++ b/dags/hivemind_etl_helpers/website_etl.py @@ -16,8 +16,6 @@ def __init__( ----------- community_id : str the community to save its data - access_token : str | None - notion ingegration access token """ self.community_id = community_id collection_name = "website" @@ -29,7 +27,8 @@ def __init__( ) async def extract( - self, urls: list[str], + self, + urls: list[str], ) -> list[dict[str, Any]]: """ Extract given urls @@ -47,7 +46,7 @@ async def extract( extracted_data = await self.crawlee_client.crawl(urls) return extracted_data - + def transform(self, raw_data: list[dict[str, Any]]) -> list[Document]: """ transform raw data to llama-index documents @@ -72,7 +71,7 @@ def transform(self, raw_data: list[dict[str, Any]]) -> list[Document]: metadata={ "title": data["title"], "url": data["url"], - } + }, ) documents.append(doc) @@ -90,7 +89,6 @@ def load(self, documents: list[Document]) -> None: # loading data into db self.ingestion_pipeline.run_pipeline(docs=documents) - def _prepare_id(data: str) -> str: """ hash the given data to prepare an id for the document @@ -105,5 +103,5 @@ def _prepare_id(data: str) -> str: hashed_data : str data hashed using sha256 algorithm """ - hashed_data = hashlib.sha256(data.encode('utf-8')) + hashed_data = hashlib.sha256(data.encode("utf-8")) return hashed_data.hexdigest() From ad7d945db93ad683826a13428444696ceef2a379 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 18 Nov 2024 12:01:39 +0330 Subject: [PATCH 06/13] feat: added missing defusedxml library! --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a711120b..4359de97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,4 +22,5 @@ tc-wikipedia-lib==1.0.1 llama-index-readers-file==0.1.22 docx2txt==0.8 tc-analyzer-lib==1.4.13 -crawlee[playwright]==0.3.8 \ No newline at end of file +crawlee[playwright]==0.3.8 +defusedxml==0.7.1 \ No newline at end of file From 2dce56eef987995a09e6d6236ae933802c65ffb5 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 18 Nov 2024 14:30:02 +0330 Subject: [PATCH 07/13] feat: completed the website ingestion DAG! --- dags/hivemind_website_ingestion.py | 75 ++++++------------------------ 1 file changed, 14 insertions(+), 61 deletions(-) diff --git a/dags/hivemind_website_ingestion.py b/dags/hivemind_website_ingestion.py index 99596046..5f39f54d 100644 --- a/dags/hivemind_website_ingestion.py +++ b/dags/hivemind_website_ingestion.py @@ -1,21 +1,17 @@ -# TODO: NOT COMPLETED AND MISSING CODES - import logging from datetime import datetime +import asyncio from airflow import DAG from airflow.decorators import task from dotenv import load_dotenv -from hivemind_etl_helpers.discord_mongo_summary_etl import process_discord_summaries -from hivemind_etl_helpers.discord_mongo_vector_store_etl import ( - process_discord_guild_mongo, -) +from hivemind_etl_helpers.website_etl import WebsiteETL from hivemind_etl_helpers.src.utils.modules import ModulesWebsite with DAG( dag_id="website_ingestion_embedding", start_date=datetime(2024, 1, 1), - schedule_interval="0 2 * * *", + schedule_interval="0 3 * * *", catchup=False, max_active_runs=1, ) as dag: @@ -33,66 +29,23 @@ def start_website_embedding(community_info: dict[str, str | datetime | list]): load_dotenv() community_id = community_info["community_id"] platform_id = community_info["platform_id"] - links = community_info["urls"] + urls = community_info["urls"] logging.info( f"Processing community_id: {community_id} | platform_id: {platform_id}" ) - process_discord_guild_mongo( - community_id=community_id, - platform_id=platform_id, - selected_channels=selected_channels, - default_from_date=from_date, - ) + website_etl = WebsiteETL(community_id=community_id) + + # Extract + raw_data = asyncio.run(website_etl.extract(urls=urls)) + # transform + documents = website_etl.transform(raw_data=raw_data) + # load into db + website_etl.load(documents=documents) + logging.info( f"Community {community_id} Job finished | platform_id: {platform_id}" ) communities_info = get_website_communities() - start_discord_vectorstore.expand(community_info=communities_info) - - -with DAG( - dag_id="discord_summary_vector_store", - start_date=datetime(2024, 1, 1), - schedule_interval="0 2 * * *", - catchup=False, - max_active_runs=1, -) as dag: - - @task - def get_mongo_discord_communities() -> list[dict[str, str | datetime | list]]: - """ - Getting all communities having discord from database - this function is the same with `get_discord_communities` - we just changed the name for the pylint - """ - communities = ModulesDiscord().get_learning_platforms() - return communities - - @task - def start_discord_summary_vectorstore( - community_info: dict[str, str | datetime | list] - ): - load_dotenv() - - community_id = community_info["community_id"] - platform_id = community_info["platform_id"] - selected_channels = community_info["selected_channels"] - from_date = community_info["from_date"] - logging.info( - f"Working on community, {community_id}| platform_id: {platform_id}" - ) - process_discord_summaries( - community_id=community_id, - platform_id=platform_id, - selected_channels=selected_channels, - default_from_date=from_date, - verbose=False, - ) - logging.info( - f"Community {community_id} Job finished | platform_id: {platform_id}" - ) - - communities = get_mongo_discord_communities() - start_discord_summary_vectorstore.expand(community_info=communities) + start_website_embedding.expand(community_info=communities_info) From 6a8f6a5fdd9c81b7de88e6e377cb393272877333 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 18 Nov 2024 14:30:12 +0330 Subject: [PATCH 08/13] feat: remove unused codes! --- .../tests/unit/test_website_etl.py | 11 ----------- dags/hivemind_etl_helpers/website_etl.py | 17 ----------------- 2 files changed, 28 deletions(-) diff --git a/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py b/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py index 49f08c30..18a403b5 100644 --- a/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py +++ b/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py @@ -79,14 +79,3 @@ def test_load(self): self.website_etl.ingestion_pipeline.run_pipeline.assert_called_once_with( docs=documents ) - - def test_prepare_id(self): - """ - Test the _prepare_id method. - """ - data = "example_data" - expected_hash = hashlib.sha256(data.encode("utf-8")).hexdigest() - - hashed_data = WebsiteETL._prepare_id(data) - - self.assertEqual(hashed_data, expected_hash) diff --git a/dags/hivemind_etl_helpers/website_etl.py b/dags/hivemind_etl_helpers/website_etl.py index 83551678..bef63471 100644 --- a/dags/hivemind_etl_helpers/website_etl.py +++ b/dags/hivemind_etl_helpers/website_etl.py @@ -88,20 +88,3 @@ def load(self, documents: list[Document]) -> None: """ # loading data into db self.ingestion_pipeline.run_pipeline(docs=documents) - - def _prepare_id(data: str) -> str: - """ - hash the given data to prepare an id for the document - - Parameters - ----------- - data : str - the string data to be hashed - - Returns - -------- - hashed_data : str - data hashed using sha256 algorithm - """ - hashed_data = hashlib.sha256(data.encode("utf-8")) - return hashed_data.hexdigest() From 9a2bbbbad50d84249c0d07703a93390fea74c62a Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 18 Nov 2024 14:44:33 +0330 Subject: [PATCH 09/13] fix: isort linter issues! --- dags/hivemind_etl_helpers/src/db/website/crawlee_client.py | 3 ++- dags/hivemind_etl_helpers/tests/unit/test_website_etl.py | 6 +++--- dags/hivemind_etl_helpers/website_etl.py | 3 +-- dags/hivemind_website_ingestion.py | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py index 829f0093..c1eff3f1 100644 --- a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py +++ b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py @@ -1,5 +1,6 @@ -from typing import Any import asyncio +from typing import Any + from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext from defusedxml import ElementTree as ET diff --git a/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py b/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py index 18a403b5..e625f1b3 100644 --- a/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py +++ b/dags/hivemind_etl_helpers/tests/unit/test_website_etl.py @@ -1,9 +1,9 @@ from unittest import IsolatedAsyncioTestCase -from dotenv import load_dotenv from unittest.mock import AsyncMock, MagicMock -from llama_index.core import Document + +from dotenv import load_dotenv from hivemind_etl_helpers.website_etl import WebsiteETL -import hashlib +from llama_index.core import Document class TestWebsiteETL(IsolatedAsyncioTestCase): diff --git a/dags/hivemind_etl_helpers/website_etl.py b/dags/hivemind_etl_helpers/website_etl.py index bef63471..7d58a408 100644 --- a/dags/hivemind_etl_helpers/website_etl.py +++ b/dags/hivemind_etl_helpers/website_etl.py @@ -1,9 +1,8 @@ from typing import Any -import hashlib from hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline -from llama_index.core import Document from hivemind_etl_helpers.src.db.website.crawlee_client import CrawleeClient +from llama_index.core import Document class WebsiteETL: diff --git a/dags/hivemind_website_ingestion.py b/dags/hivemind_website_ingestion.py index 5f39f54d..fdb10dd2 100644 --- a/dags/hivemind_website_ingestion.py +++ b/dags/hivemind_website_ingestion.py @@ -1,12 +1,12 @@ +import asyncio import logging from datetime import datetime -import asyncio from airflow import DAG from airflow.decorators import task from dotenv import load_dotenv -from hivemind_etl_helpers.website_etl import WebsiteETL from hivemind_etl_helpers.src.utils.modules import ModulesWebsite +from hivemind_etl_helpers.website_etl import WebsiteETL with DAG( dag_id="website_ingestion_embedding", @@ -42,7 +42,7 @@ def start_website_embedding(community_info: dict[str, str | datetime | list]): documents = website_etl.transform(raw_data=raw_data) # load into db website_etl.load(documents=documents) - + logging.info( f"Community {community_id} Job finished | platform_id: {platform_id}" ) From 7710f0541761fd58402b169d8e8e15854793dcd2 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 19 Nov 2024 10:18:47 +0330 Subject: [PATCH 10/13] fix: using a fixed pydantic version! There was a new pre-release version that was causing errors --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 4359de97..c4b8f11c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,5 @@ llama-index-readers-file==0.1.22 docx2txt==0.8 tc-analyzer-lib==1.4.13 crawlee[playwright]==0.3.8 -defusedxml==0.7.1 \ No newline at end of file +defusedxml==0.7.1 +pydantic==2.9.2 \ No newline at end of file From fd299acaf3d7638509ad67486e26bd2d5a7a5572 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 19 Nov 2024 10:22:04 +0330 Subject: [PATCH 11/13] fix: wrong place for return statement! --- dags/hivemind_etl_helpers/src/db/website/crawlee_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py index c1eff3f1..94f86e36 100644 --- a/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py +++ b/dags/hivemind_etl_helpers/src/db/website/crawlee_client.py @@ -62,15 +62,14 @@ def _extract_links_from_sitemap(self, sitemap_content: str) -> list[str]: links : list[str] list of valid URLs extracted from the sitemap """ + links = [] try: root = ET.fromstring(sitemap_content) namespace = {"ns": "http://www.sitemaps.org/schemas/sitemap/0.9"} - links = [] for element in root.findall("ns:url/ns:loc", namespace): url = element.text.strip() if element.text else None if url and url.startswith(("http://", "https://")): links.append(url) - return links except ET.ParseError as e: raise ValueError(f"Invalid sitemap XML: {str(e)}") From 1fcb0b62c36e2d39c1d840e3025afabfd51b3095 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 19 Nov 2024 10:23:50 +0330 Subject: [PATCH 12/13] fix: get_website_communities docstring! --- dags/hivemind_website_ingestion.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dags/hivemind_website_ingestion.py b/dags/hivemind_website_ingestion.py index fdb10dd2..8dd06710 100644 --- a/dags/hivemind_website_ingestion.py +++ b/dags/hivemind_website_ingestion.py @@ -19,7 +19,13 @@ @task def get_website_communities() -> list[dict[str, str | datetime | list]]: """ - Getting all communities having discord from database + Retrieve all communities with associated website URLs from the database. + + Returns: + list[dict]: List of community information containing: + - community_id (str) + - platform_id (str) + - urls (list) """ communities = ModulesWebsite().get_learning_platforms() return communities From 6542ae5cd44b7b6d5bc324725978bfaa53a250d6 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 19 Nov 2024 10:28:46 +0330 Subject: [PATCH 13/13] feat: Added more error handlings! --- dags/hivemind_etl_helpers/website_etl.py | 5 +++++ dags/hivemind_website_ingestion.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dags/hivemind_etl_helpers/website_etl.py b/dags/hivemind_etl_helpers/website_etl.py index 7d58a408..e66958fd 100644 --- a/dags/hivemind_etl_helpers/website_etl.py +++ b/dags/hivemind_etl_helpers/website_etl.py @@ -42,8 +42,13 @@ async def extract( extracted_data : list[dict[str, Any]] The crawled data from urls """ + if not urls: + raise ValueError("No URLs provided for crawling") extracted_data = await self.crawlee_client.crawl(urls) + if not extracted_data: + raise ValueError(f"No data extracted from URLs: {urls}") + return extracted_data def transform(self, raw_data: list[dict[str, Any]]) -> list[Document]: diff --git a/dags/hivemind_website_ingestion.py b/dags/hivemind_website_ingestion.py index 8dd06710..52e26fcd 100644 --- a/dags/hivemind_website_ingestion.py +++ b/dags/hivemind_website_ingestion.py @@ -20,7 +20,7 @@ def get_website_communities() -> list[dict[str, str | datetime | list]]: """ Retrieve all communities with associated website URLs from the database. - + Returns: list[dict]: List of community information containing: - community_id (str)