Skip to content

Commit

Permalink
feat: Add Subtitulamos provider integration - mypy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Nyaran committed Oct 6, 2024
1 parent 73dbcb3 commit 9a8def3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 57 deletions.
6 changes: 3 additions & 3 deletions subliminal/converters/subtitulamos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from typing import TYPE_CHECKING

from babelfish import LanguageReverseConverter, language_converters
from babelfish import LanguageReverseConverter, language_converters # type: ignore[import-untyped]

if TYPE_CHECKING:
from . import LanguageTuple
Expand Down Expand Up @@ -37,11 +37,11 @@ def convert(self, alpha3: str, country: str | None = None, script: str | None =
if (alpha3,) in self.to_subtitulamos:
return self.to_subtitulamos[(alpha3,)]

return self.name_converter.convert(alpha3, country, script)
return self.name_converter.convert(alpha3, country, script) # type: ignore[no-any-return]

def reverse(self, code: str) -> LanguageTuple:
"""Reverse a custom code into alpha3, country and script code."""
if code in self.from_subtitulamos:
return self.from_subtitulamos[code]

return self.name_converter.reverse(code)
return self.name_converter.reverse(code) # type: ignore[no-any-return]
101 changes: 47 additions & 54 deletions subliminal/providers/subtitulamos.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
import contextlib
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, cast

from babelfish import Language, language_converters # type: ignore[import-untyped]
from guessit import guessit
from guessit import guessit # type: ignore[import-untyped]
from requests import Session

from subliminal import __short_version__
from subliminal.cache import SHOW_EXPIRATION_TIME, region
from subliminal.exceptions import ProviderError
from subliminal.exceptions import NotInitializedProviderError, ProviderError
from subliminal.matches import guess_matches
from subliminal.score import get_equivalent_release_groups
from subliminal.subtitle import Subtitle, fix_line_ending
from subliminal.utils import sanitize, sanitize_release_group
from subliminal.video import Episode

from . import ParserBeautifulSoup, Provider
Expand Down Expand Up @@ -63,40 +61,20 @@ def __init__(
self.release_group = release_group
self.download_link = download_link

@property
def id(self) -> str:
"""Unique identifier of the subtitle."""
return self.download_link

def get_matches(self, video: Video) -> set[str]:
"""Get the matches against the `video`."""
matches = set()

# series
if video.series and sanitize(self.series) == sanitize(video.series):
matches.add('series')
# season
if video.season and self.season == video.season:
matches.add('season')
# episode
if video.episode and self.episode == video.episode:
matches.add('episode')
# title
if video.title and sanitize(self.title) == sanitize(video.title):
matches.add('title')
# year
if video.original_series and self.year is None or video.year and video.year == self.year:
matches.add('year')
# release_group
if (
video.release_group
and self.release_group
and any(
r in sanitize_release_group(self.release_group)
for r in get_equivalent_release_groups(sanitize_release_group(video.release_group))
)
):
matches.add('release_group')
matches = guess_matches(
video,
{
'title': self.series,
'season': self.season,
'episode': self.episode,
'episode_title': self.title,
'year': self.year,
'release_group': self.release_group,
},
)

# resolution
if video.resolution and self.release_group and video.resolution in self.release_group.lower():
matches.add('resolution')
Expand All @@ -117,6 +95,7 @@ class SubtitulamosProvider(Provider):
video_types = (Episode,)
server_url = 'https://www.subtitulamos.tv'
search_url = server_url + '/search/query'
session: Session | None

def __init__(self) -> None:
self.session = None
Expand All @@ -128,11 +107,16 @@ def initialize(self) -> None:

def terminate(self) -> None:
"""Terminate the provider."""
if not self.session:
raise NotInitializedProviderError
self.session.close()
self.session = None

def _session_request(self, *args: Any, **kwargs: Any) -> Response:
"""Perform a GET request to the provider."""
if not self.session:
raise NotInitializedProviderError

r = self.session.get(*args, **kwargs)
r.raise_for_status()

Expand All @@ -147,7 +131,8 @@ def _query_search(self, search_param: str) -> list[dict[str, str]]:
r = self._session_request(
self.search_url, headers={'Referer': self.server_url}, params={'q': search_param}, timeout=10
)
return json.loads(r.text)
data = json.loads(r.text)
return cast(list[dict[str, str]], data)

def _read_series(self, series_url: str) -> ParserBeautifulSoup:
"""Read series information from provider."""
Expand All @@ -160,28 +145,25 @@ def _get_episode_url(self, series_id: str, season: int, episode: int) -> str | N

for season_element in series_content.select('#season-choices a.choice'):
if season == int(season_element.get_text()):
if 'selected' not in season_element.get('class'):
series_content = self._read_series(season_element['href'])
if 'selected' not in (list[str], season_element.get('class', [])):
series_content = self._read_series(cast(str, season_element.get('href', '')))
break
return None

for episode_element in series_content.select('#episode-choices a.choice'):
if episode == int(episode_element.get_text()):
return episode_element['href']
return cast(str, episode_element.get('href', ''))
return None

@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME)
def _search_url_titles(
self, series: str | None = None, season: int | None = None, episode: int | None = None, year: int | None = None
) -> str:
def _search_url_titles(self, series: str, season: int, episode: int, year: int | None = None) -> str | None:
"""Search the URL titles by kind for the given `title`, `season` and `episode`.
:param str series: series to search for.
:param int season: season to search for.
:param int episode: episode to search for.
:param int year: year to search for.
:return: the episode URL.
:rtype: str
:param str series: Series to search for.
:param int season: Season to search for.
:param int episode: Episode to search for.
:param int year: Year to search for.
:return: The episode URL.
"""
logger.info('Searching episode url for %s, season %d, episode %d', series, season, episode)
Expand All @@ -192,14 +174,16 @@ def _search_url_titles(
series_response = self._query_search(series)

episode_url = self._get_episode_url(series_response[0]['show_id'], season, episode)
if episode_url:
return self.server_url + episode_url
return None

return self.server_url + episode_url if episode_url else None

def query(
self, series: str | None = None, season: int | None = None, episode: int | None = None, year: int | None = None
) -> list[SubtitulamosSubtitle]:
"""Query the provider for subtitles."""
if not self.session:
raise NotInitializedProviderError

# get the episode url
episode_url = self._search_url_titles(series, season, episode, year)
if episode_url is None:
Expand Down Expand Up @@ -233,7 +217,7 @@ def query(
release_group = version_container.select('.version-container .text.spaced')[0].getText()

# read the subtitle url
subtitle_url = self.server_url + sub.parent['href']
subtitle_url = self.server_url + cast(str, sub.parent.get('href', ''))
subtitle = SubtitulamosSubtitle(
language,
hearing_impaired,
Expand All @@ -253,10 +237,19 @@ def query(

def list_subtitles(self, video: Video, languages: Set[Language]) -> list[SubtitulamosSubtitle]:
"""List all the subtitles for the video."""
if not isinstance(video, Episode):
return []

return [s for s in self.query(video.series, video.season, video.episode, video.year) if s.language in languages]

def download_subtitle(self, subtitle: SubtitulamosSubtitle) -> None:
"""Download the content of the subtitle."""
if not self.session:
raise NotInitializedProviderError

if not subtitle.download_link:
return

logger.info('Downloading subtitle %s', subtitle.download_link)
r = self.session.get(subtitle.download_link, headers={'Referer': subtitle.page_link}, timeout=10)
r.raise_for_status()
Expand Down

0 comments on commit 9a8def3

Please sign in to comment.