Skip to content

Commit

Permalink
Merge pull request #43 from ES2-UFPI/orquestracao-de-scraping-#8
Browse files Browse the repository at this point in the history
Orquestracao de scraping #8
  • Loading branch information
vitin-m authored Mar 2, 2023
2 parents c0ed7f9 + 5b9097e commit f3d74e7
Show file tree
Hide file tree
Showing 18 changed files with 445 additions and 35 deletions.
73 changes: 73 additions & 0 deletions src/Scraper/application/ScraperOrchestration/Wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from sqlalchemy.orm.session import Session
from typing import List
from random import uniform
import asyncio

from SearchEngine.infrastructure.ComponentManagment.SQL_alchemy_repository import (
SQLAlchemyRepository,
)
from Scraper.domain.value_object import AbstractScraper, URL
from Scraper.domain.category_url import CategoryURL
from Scraper.domain.aggragate import VolatileData
from Scraper.domain.service import FactoryScraper
from framework.domain.value_object import UUIDv5
from entrypoints.api.endpoints.connection_util import engine
from framework.infrastructure.db_management.db_connection import create_session
from Scraper.infrastructure.VolatileDataManagment.SQL_alchemy_volatile_data import (
SQLAlchemyVolatileData,
)
from Scraper.infrastructure.ScraperOrchestration.category_URL_manager import (
CategoryURLManager,
)


class Wrapper:
_volatile_data_manager: SQLAlchemyVolatileData
domain: str
scraper: AbstractScraper
domain_urls: List[CategoryURL]
session: Session

max_sleep_seconds = 3

def __init__(self, scheme: str, domain: str):
self.domain = domain
self.session = create_session(engine)
self._volatile_data_manager = SQLAlchemyVolatileData(self.session)

factory_scraper = FactoryScraper()
url_manager = CategoryURLManager(self.session)
self.scraper = factory_scraper.build_scraper(f"{scheme}://{domain}")
self.domain_urls = url_manager.get(filters_eq={"domain": domain})

async def run_scraping(self):
for domain_url in self.domain_urls:
next_url: URL = domain_url.url

while next_url != None:
next_url, volatile_datas = self.scraper.get_volatile_data(
url=next_url.url
)

for url, name, cost, availability in volatile_datas:
# TODO: fazer chamada da engine de busca para classificar o componente
# component = SearchEngine.classifie(name)
component_manager = SQLAlchemyRepository(
self.session
) # placeholder
component = component_manager.get(filters_gt={"consumption": -1})[
0
] # placeholder

volatile_data = VolatileData(
_id=UUIDv5(url.url),
component_id=component.uid,
url=url,
cost=cost,
availability=availability,
)

self._volatile_data_manager.add(volatile_data)

sleep_seconds = uniform(1.0, self.max_sleep_seconds)
await asyncio.sleep(sleep_seconds)
103 changes: 103 additions & 0 deletions src/Scraper/application/ScraperOrchestration/category_URL_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from sqlalchemy.orm.session import Session
from sqlalchemy.exc import NoResultFound
from typing import List
from operator import lt, gt, eq

from framework.domain.value_object import UUID
from framework.infrastructure.db_management.db_structure import CategoryUrlInstance
from Scraper.domain.category_url import CategoryURL
from framework.domain.value_object import URL
from framework.domain.components import EComponentType
from Scraper.domain.repositories import (
ISQLAlchemyRepository,
EntityUIDNotFoundException,
)


class CategoryURLManager(ISQLAlchemyRepository):
_filters_ops: dict = {"filters_eq": eq, "filters_lt": lt, "filters_gt": gt}

def __init__(self, session):
self._session: Session = session

def _url_to_db(self, category_url: CategoryURL):
url_instance = CategoryUrlInstance()
url = category_url.url

url_instance.uid = category_url.uid
url_instance.domain = url.domain
url_instance.path = url.path
url_instance.scheme = url.scheme
url_instance.type = category_url.category

return url_instance

def _db_to_category_url(self, url_instance: CategoryUrlInstance):
url_str = f"{url_instance.scheme}://{url_instance.domain}/{url_instance.path}"
url = URL(url_str, url_instance.scheme, url_instance.domain, url_instance.path)
category_url = CategoryURL(
_id=url_instance.uid, url=url, category=url_instance.type
)

return category_url

def _parse_filters(self, **kwargs) -> List:
ret = []

for filter_type, filters in kwargs.items():
if filter_type in self._filters_ops.keys():
op = self._filters_ops[filter_type]

[
ret.append(op(getattr(CategoryUrlInstance, prop), value))
for prop, value in filters.items()
]

return ret

def _filter_components_from_db(self, filters: List) -> List[CategoryURL]:
url_instances: List[CategoryUrlInstance] = (
self._session.query(CategoryUrlInstance).filter(*filters).all()
)

urls = [self._db_to_category_url(instance) for instance in url_instances]

return urls

def _add(self, category_url: CategoryURL):
url = category_url.url
url_instance = self._url_to_db(url, category_url.category)
self._session.add(url_instance)
self._session.commit()

def _get(self, **kwargs) -> List[CategoryURL]:
ret = []

filters = self._parse_filters(**kwargs)

urls = self._filter_components_from_db(filters)
ret.extend(urls)

return ret

def _get_by_uid(self, ref: UUID):
query_filter = [CategoryUrlInstance.uid == ref]

try:
category_url: CategoryUrlInstance = (
self._session.query(CategoryUrlInstance).filter(*query_filter).one()
)

url = self._db_to_category_url(category_url)

except NoResultFound:
raise EntityUIDNotFoundException(ref)

return url

def get_urls(self) -> List[tuple[str, str]]:
params = [CategoryUrlInstance.scheme, CategoryUrlInstance.domain]

query = self._session.query(*params).distinct(CategoryUrlInstance.domain)
urls = [url for url in query]
return urls
39 changes: 39 additions & 0 deletions src/Scraper/application/ScraperOrchestration/orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import sys

sys.path.insert(0, r"C:\Users\wesle\OneDrive\Documentos\UFPI\ESII\WiseBuilder\src")

import asyncio
from Scraper.infrastructure.ScraperOrchestration.category_URL_manager import (
CategoryURLManager,
)
from framework.infrastructure.db_management.db_connection import create_session
from entrypoints.api.endpoints.connection_util import engine
from Scraper.infrastructure.ScraperOrchestration.Wrapper import Wrapper

_category_url_manager = CategoryURLManager(create_session(engine))
_sleep_minutes = 60


async def run_scrapers():
while True:
urls = _category_url_manager.get_urls()

tasks = []

for scheme, domain in urls:
wrapper = Wrapper(scheme, domain)
tasks.append(wrapper.run_scraping())

await asyncio.gather(*tasks)
await asyncio.sleep(_sleep_minutes * 60)


def main():
orchestrator_loop = asyncio.new_event_loop()
asyncio.set_event_loop(orchestrator_loop)
orchestrator_loop.run_until_complete(run_scrapers())
orchestrator_loop.close()


if __name__ == "__main__":
main()
18 changes: 13 additions & 5 deletions src/Scraper/domain/aggragate.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
from datetime import datetime
from dataclasses import dataclass, field
from typing import List

from framework.domain.entity import AggregateRoot
from framework.domain.value_object import UUID, Money, URL
from .entity import MatchesTrackedComponent

_AttrsVolatileData = ["_id", "url", "component_id", "cost", "availability", "timestamp"]


@dataclass(kw_only=True)
class VolatileData(AggregateRoot):
component_id: UUID
url_id: UUID
# url_id: UUID
url: URL
component_id: UUID
cost: Money
availability: bool

timestamp: datetime = field(default=datetime.utcnow())

def __hash__(self):
return hash(self.uid)

@classmethod
def get_attrs(cls) -> List[str]:
return _AttrsVolatileData.copy()

def generateVolatileDataPoint(
self,
_id: UUID,
component_id: UUID,
url_id: UUID,
url: URL,
cost: Money,
availability: bool,
):
return VolatileData(
_id=_id,
component_id=component_id,
url_id=url_id,
url=url,
cost=cost,
availability=availability,
Expand Down
21 changes: 13 additions & 8 deletions src/Scraper/domain/entity.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from dataclasses import dataclass

from framework.domain.entity import Entity
from framework.domain.value_object import URL
from framework.domain.components import EComponentType


@dataclass
class MatchesTrackedComponent(Rule):
# Verifica se o componente é um dos componentes observados
component_name: str

def is_broken(self) -> bool:
# verificar se componente existe
# return not SearchEngine.get_id_by_name(component_name)
return False
class CategoryURL(Entity):
url: URL
category: EComponentType

def __hash__(self):
return hash(self.uid)
70 changes: 70 additions & 0 deletions src/Scraper/domain/repositories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dataclasses import dataclass, field
from typing import Dict
from abc import ABCMeta, abstractmethod

from framework.domain.value_object import UUID
from framework.domain.repository import AbstractRepository
from framework.domain.exception import DomainException
from Scraper.domain.aggragate import VolatileData


@dataclass
class EntityUIDNotFoundException(DomainException):
entity_id: UUID
_message: str = field(init=False)

def __post_init__(self):
self._message = f"{self.__class__.__name__}: "
f"Componente com UID {self.entity_id} não existe."


class MockRepository(AbstractRepository):
def __init__(self, volatile_datas: Dict[UUID, VolatileData]):
self._volatile_data = volatile_datas

def _add(self, volatile_data: VolatileData):
self._volatile_data[volatile_data.uid] = volatile_data

def _get_by_uid(self, ref: UUID):
ret = self._volatile_data.get(ref, None)
if ret:
return self._volatile_data[ref]
raise EntityUIDNotFoundException(ref)

def _get(self, **kwargs):
qsize = kwargs.get("qsize", 10)
ctype = kwargs.get("availability", None)

ret = list()
if ctype:
for v in self._volatile_data.values():
if v.availability == True:
ret.append(v)
if len(ret) == qsize:
break

return ret

def __repr__(self):
return str(self._volatile_data)


class ISQLAlchemyRepository(AbstractRepository, metaclass=ABCMeta):
@abstractmethod
def __init__(self, session):
raise NotImplemented

@abstractmethod
def _add(self, volatile_data: VolatileData):
raise NotImplemented

@abstractmethod
def _get_by_uid(self, ref: UUID):
raise NotImplemented

@abstractmethod
def _get(self, **kwargs):
raise NotImplemented

def __repr__(self):
raise NotImplemented
5 changes: 4 additions & 1 deletion src/Scraper/domain/scrapers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ def get_volatile_data(

n_next_page = n_actual_page + 1

next_url: URL | None = None

if n_next_page in number_of_pages:
next_page = url.split("?")[0] + self.query_string.format(
page_number=n_next_page
)
next_url = URL.get_URL(next_page)
else:
next_page = None

return next_page, volatile_data
return next_url, volatile_data
7 changes: 5 additions & 2 deletions src/Scraper/domain/service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from dataclasses import dataclass, field
from .value_object import AbstractScraper
from typing import Dict, Union, NoReturn
from typing import Dict, Union, NoReturn, Type
from Scraper.domain.scrapers import KabumScraper


@dataclass
class FactoryScraper:
_scrapers: Dict[str, AbstractScraper] = field(init=False, default_factory=dict)
_scrapers: Dict[str, AbstractScraper] = field(
init=False, default_factory=lambda: {KabumScraper.raw_url: KabumScraper()}
)

def build_scraper(self, domain: str) -> Union[AbstractScraper, NoReturn]:
_scraper: AbstractScraper | None = self._scrapers.get(domain)
Expand Down
Loading

0 comments on commit f3d74e7

Please sign in to comment.