diff --git a/Dockerfile b/Dockerfile index 7ab04fc..f4b6810 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,5 +16,6 @@ RUN ["/bin/bash", "-c", "set -o pipefail && curl -sSL https://install.python-poe COPY ./UltimaScraper . RUN /usr/local/share/pypoetry/bin/poetry update RUN sed -i '/if asset.urls:/a\ if asset.__content_metadata__.__soft__.get_author().id == authed.id:\n continue' /usr/local/lib/python3.10/site-packages/ultima_scraper_collection/managers/datascraper_manager/datascrapers/onlyfans.py +COPY ./__init__.py /usr/local/lib/python3.10/site-packages/ultima_scraper_api/apis/onlyfans CMD [ "/usr/local/share/pypoetry/bin/poetry", "run", "python", "./start_us.py" ] diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..bf8f815 --- /dev/null +++ b/__init__.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Literal +from urllib.parse import urlparse + +SubscriptionType = Literal["all", "active", "expired", "attention"] + +if TYPE_CHECKING: + from ultima_scraper_api.apis.onlyfans.classes.user_model import ( + AuthModel, + create_user, + ) + + +class SiteContent: + def __init__(self, option: dict[str, Any], user: AuthModel | create_user) -> None: + self.id: int = option["id"] + self.author = user + self.media: list[dict[str, Any]] = option.get("media", []) + self.preview_ids: list[int] = [] + self.__raw__ = option + + def url_picker(self, media_item: dict[str, Any], video_quality: str = ""): + authed = self.get_author().get_authed() + video_quality = ( + video_quality or self.author.get_api().get_site_settings().video_quality + ) + if not media_item["canView"]: + return + source: dict[str, Any] = {} + media_type: str = "" + if "files" in media_item: + media_type = media_item["type"] + media_item = media_item["files"] + source = media_item["full"] + else: + return + url = source.get("url") + if authed.drm: + has_drm = authed.drm.has_drm(media_item) + if has_drm: + url = has_drm + return urlparse(url) if url else None + + def preview_url_picker(self, media_item: dict[str, Any]): + preview_url = None + if "files" in media_item: + if ( + "preview" in media_item["files"] + and "url" in media_item["files"]["preview"] + ): + preview_url = media_item["files"]["preview"]["url"] + else: + preview_url = media_item["preview"] + return urlparse(preview_url) if preview_url else None + + def get_author(self): + return self.author + + async def refresh(self): + func = await self.author.scrape_manager.handle_refresh(self) + return await func(self.id)