diff --git a/README.md b/README.md index 5cfec5d..89ea82a 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,16 @@ You enjoy getting games for free but you *don’t* enjoy having to keep track of the various sources (Amazon Prime, Epic Games, Steam, ...) for free offers? Also your F5 key starts to look a bit worn out? Then this is for you! -This Python (3.10+) application uses Selenium to automatically visit sites with free gaming related offers (currently Amazon Prime, Epic Games and Steam are supported, more will follow) and then neatly puts the gathered information into RSS feeds. So now you can track the offers using your favorite news reader like Feedly instead of manually visiting the sites. +This Python (3.10+) application uses Selenium to automatically visit sites with free gaming related offers (currently Amazon Prime, Epic Games and Steam are supported, more will follow) and then neatly puts the gathered information into RSS feeds and a Telegram bot. So now you can track the offers using your favorite news reader like Feedly instead of manually visiting the sites or get a message every time a new offer is available. ## Usage You can either run this script locally on your computer or in any environment capable of running a Docker container. Just want the feeds? Sure. You can use the links below. They are updated every 20 minutes. + +If you prefer Telegram, you can instead subscribe to the [Telegram LootScraperBot](https://t.me/LootScraperBot) to get push notifications for new offers. + - Amazon Prime ([games](https://feed.phenx.de/lootscraper_amazon_game.xml) and [ingame loot](https://feed.phenx.de/lootscraper_amazon_loot.xml)) - [Epic Games (games only)](https://feed.phenx.de/lootscraper_epic_game.xml) - [Gog (games only)](https://feed.phenx.de/lootscraper_gog_game.xml) diff --git a/alembic/versions/20220419_124703_split_game_information.py b/alembic/versions/20220419_124703_split_game_information.py index 916668c..34f0f09 100644 --- a/alembic/versions/20220419_124703_split_game_information.py +++ b/alembic/versions/20220419_124703_split_game_information.py @@ -6,13 +6,13 @@ """ from typing import Any -from alembic import op -import sqlalchemy as sa -from app.sqlalchemy import AwareDateTime +import sqlalchemy as sa from sqlalchemy import orm from sqlalchemy.ext.declarative import declarative_base +from alembic import op +from app.sqlalchemy import AwareDateTime # revision identifiers, used by Alembic. revision = "038c26b62555" diff --git a/alembic/versions/20220420_144219_user_table.py b/alembic/versions/20220420_144219_user_table.py new file mode 100644 index 0000000..44b2826 --- /dev/null +++ b/alembic/versions/20220420_144219_user_table.py @@ -0,0 +1,51 @@ +"""User table + +Revision ID: 8cfaaf08b306 +Revises: 038c26b62555 +Create Date: 2022-04-20 14:42:19.402926+00:00 + +""" +import sqlalchemy as sa + +from alembic import op +from app.sqlalchemy import AwareDateTime + +# revision identifiers, used by Alembic. +revision = "8cfaaf08b306" +down_revision = "038c26b62555" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "users", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("registration_date", AwareDateTime(), nullable=True), + sa.Column("offers_received_count", sa.Integer(), nullable=True), + sa.Column("telegram_id", sa.Integer(), nullable=True), + sa.Column("telegram_chat_id", sa.Integer(), nullable=True), + sa.Column("telegram_user_details", sa.JSON(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_table( + "telegram_subscriptions", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column( + "source", + sa.Enum("AMAZON", "EPIC", "STEAM", "GOG", name="source"), + nullable=False, + ), + sa.Column("type", sa.Enum("LOOT", "GAME", name="offertype"), nullable=False), + sa.ForeignKeyConstraint( + ["user_id"], + ["users.id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + + +def downgrade() -> None: + op.drop_table("telegram_subscriptions") + op.drop_table("users") diff --git a/alembic/versions/20220422_090529_last_sent_offer.py b/alembic/versions/20220422_090529_last_sent_offer.py new file mode 100644 index 0000000..9f480ac --- /dev/null +++ b/alembic/versions/20220422_090529_last_sent_offer.py @@ -0,0 +1,45 @@ +"""Last sent offer + +Revision ID: 52ea632ee417 +Revises: 8cfaaf08b306 +Create Date: 2022-04-22 09:05:29.092417+00:00 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "52ea632ee417" +down_revision = "8cfaaf08b306" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + with op.batch_alter_table("offers", schema=None) as batch_op: # type: ignore + batch_op.alter_column("seen_first", existing_type=sa.DATETIME(), nullable=False) + batch_op.alter_column("seen_last", existing_type=sa.DATETIME(), nullable=False) + + with op.batch_alter_table( # type: ignore + "telegram_subscriptions", schema=None + ) as batch_op: + batch_op.add_column(sa.Column("last_offer_id", sa.Integer(), nullable=True)) + + op.execute("UPDATE telegram_subscriptions SET last_offer_id = 0") + + with op.batch_alter_table( # type: ignore + "telegram_subscriptions", schema=None + ) as batch_op: + batch_op.alter_column( + "last_offer_id", existing_type=sa.Integer(), nullable=False + ) + + +def downgrade() -> None: + with op.batch_alter_table("telegram_subscriptions", schema=None) as batch_op: # type: ignore + batch_op.drop_column("last_offer_id") + + with op.batch_alter_table("offers", schema=None) as batch_op: # type: ignore + batch_op.alter_column("seen_last", existing_type=sa.DATETIME(), nullable=True) + batch_op.alter_column("seen_first", existing_type=sa.DATETIME(), nullable=True) diff --git a/app/common.py b/app/common.py index f274aef..c7ef56f 100644 --- a/app/common.py +++ b/app/common.py @@ -2,7 +2,7 @@ TIMESTAMP_SHORT = "%Y-%m-%d" TIMESTAMP_LONG = "%Y-%m-%d %H:%M:%S" -TIMESTAMP_READABLE_WITH_HOUR = "%Y-%m-%d - %H:%M (UTC)" +TIMESTAMP_READABLE_WITH_HOUR = "%Y-%m-%d - %H:%M UTC" class OfferType(Enum): @@ -15,3 +15,7 @@ class Source(Enum): EPIC = "Epic Games" STEAM = "Steam" GOG = "GOG" + + +def chunkstring(string: str, length: int) -> list[str]: + return list((string[0 + i : length + i] for i in range(0, len(string), length))) diff --git a/app/configparser.py b/app/configparser.py index 7ea59ee..92d53fa 100644 --- a/app/configparser.py +++ b/app/configparser.py @@ -37,6 +37,7 @@ class ParsedConfig: scrape_info: bool = True # Not used anywhere yet generate_feed: bool = True upload_feed: bool = False + telegram_bot: bool = False # Telegram telegram_access_token: str = "" @@ -118,6 +119,7 @@ def get() -> ParsedConfig: parsed_config.scrape_info = config.getboolean("actions", "ScrapeInfo") parsed_config.generate_feed = config.getboolean("actions", "GenerateFeed") parsed_config.upload_feed = config.getboolean("actions", "UploadFtp") + parsed_config.telegram_bot = config.getboolean("actions", "TelegramBot") parsed_config.telegram_access_token = config["telegram"]["AccessToken"] parsed_config.telegram_developer_chat_id = int( diff --git a/app/feed.py b/app/feed.py index 81784ee..ac4b431 100644 --- a/app/feed.py +++ b/app/feed.py @@ -116,7 +116,15 @@ def generate_feed( if game.steam_info.metacritic_url: text = f'{text}' ratings.append(text) - + if ( + game.steam_info + and game.steam_info.percent + and game.steam_info.score + and game.steam_info.recommendations + ): + text = f"Steam {game.steam_info.percent} % ({game.steam_info.score}/10, {game.steam_info.recommendations} recommendations)" + text = f'{text}' + ratings.append(text) if ( game.igdb_info and game.igdb_info.meta_ratings @@ -133,15 +141,6 @@ def generate_feed( text = f"IGDB User {game.igdb_info.user_score} % ({game.igdb_info.user_ratings} sources)" text = f'{text}' ratings.append(text) - if ( - game.steam_info - and game.steam_info.percent - and game.steam_info.score - and game.steam_info.recommendations - ): - text = f"Steam {game.steam_info.percent} % ({game.steam_info.score}/10, {game.steam_info.recommendations} recommendations)" - text = f'{text}' - ratings.append(text) if len(ratings) > 0: content += f"
  • Ratings: {' / '.join(ratings)}
  • " if game.igdb_info and game.igdb_info.release_date: @@ -159,8 +158,8 @@ def generate_feed( f"
  • Genres: {html.escape(game.steam_info.genres)}
  • " ) content += "" + content += "

    * Any information about the offer is automatically grabbed and may in rare cases not match the correct game.

    " - content += "

    * Any information about the offer is automatically grabbed and may in rare cases not match the correct game.

    " content += f'

    Source: {html.escape(offer.source.value)}, Seen first: {offer.seen_first.strftime(TIMESTAMP_LONG)}, Generated by LootScraper

    ' feed_entry.content(content, type="xhtml") # - Link diff --git a/app/scraper/info/igdb.py b/app/scraper/info/igdb.py index dac3168..2f6fcd5 100644 --- a/app/scraper/info/igdb.py +++ b/app/scraper/info/igdb.py @@ -1,7 +1,8 @@ import json import logging -from datetime import datetime, timezone import re +from datetime import datetime, timezone + import requests from igdb.wrapper import IGDBWrapper diff --git a/app/scraper/info/steam.py b/app/scraper/info/steam.py index d445bbf..f516347 100644 --- a/app/scraper/info/steam.py +++ b/app/scraper/info/steam.py @@ -26,9 +26,12 @@ STEAM_DETAILS_STORE = "https://store.steampowered.com/app/" STEAM_DETAILS_REVIEW_SCORE = '//div[@id="userReviews"]/div[@itemprop="aggregateRating"]' # data-tooltip-html attribute STEAM_DETAILS_REVIEW_SCORE_VALUE = '//div[@id="userReviews"]/div[@itemprop="aggregateRating"]//meta[@itemprop="ratingValue"]' # content attribute +STEAM_DETAILS_REVIEW_COUNT = '//div[@id="userReviews"]/div[@itemprop="aggregateRating"]//meta[@itemprop="reviewCount"]' # content attribute STEAM_DETAILS_LOADED = '//div[contains(concat(" ", normalize-space(@class), " "), " game_page_background ")]' STEAM_PRICE_FULL = '(//div[contains(concat(" ", normalize-space(@class), " "), " game_area_purchase_game ")])[1]//div[contains(concat(" ", normalize-space(@class), " "), " game_purchase_action")]//div[contains(concat(" ", normalize-space(@class), " "), " game_purchase_price")]' # text=" 27,99€ " STEAM_PRICE_DISCOUNTED_ORIGINAL = '(//div[contains(concat(" ", normalize-space(@class), " "), " game_area_purchase_game ")])[1]//div[contains(concat(" ", normalize-space(@class), " "), " game_purchase_action")]//div[contains(concat(" ", normalize-space(@class), " "), " discount_original_price")]' # text=" 27,99€ " +STEAM_RELEASE_DATE = '//div[@id="genresAndManufacturer"]' + MAX_WAIT_SECONDS = 30 # Needs to be quite high in Docker for first run @@ -112,7 +115,7 @@ def get_steam_details( # No entry found, not adding any data return None - logging.info(f"Steam: Reading details for app id {id}") + logging.info(f"Steam: Reading details for app id {steam_app_id}") steam_info = SteamInfo() steam_info.id = steam_app_id @@ -242,10 +245,8 @@ def get_steam_details( element: WebElement = driver.find_element(By.XPATH, STEAM_DETAILS_REVIEW_SCORE) rating_str: str = element.get_attribute("data-tooltip-html") # type: ignore steam_info.percent = int(rating_str.split("%")[0].strip()) - except WebDriverException: logging.error(f"No Steam percentage found for {steam_app_id}!") - except ValueError: logging.error(f"Invalid Steam percentage {rating_str} for {steam_app_id}!") @@ -254,51 +255,74 @@ def get_steam_details( By.XPATH, STEAM_DETAILS_REVIEW_SCORE_VALUE ) rating2_str: str = element2.get_attribute("content") # type: ignore - try: - steam_info.score = int(rating2_str) - except ValueError: - pass - + steam_info.score = int(rating2_str) except WebDriverException: logging.error(f"No Steam rating found for {steam_app_id}!") - except ValueError: logging.error(f"Invalid Steam rating {rating2_str} for {steam_app_id}!") + if steam_info.recommendations is None: + try: + element6: WebElement = driver.find_element( + By.XPATH, STEAM_DETAILS_REVIEW_COUNT + ) + recommendations_str: str = element6.get_attribute("content") # type: ignore + steam_info.recommendations = int(recommendations_str) + except WebDriverException: + logging.error(f"No Steam rating found for {steam_app_id}!") + except ValueError: + logging.error(f"Invalid Steam rating {recommendations_str} for {steam_app_id}!") + if steam_info.recommended_price_eur is None: try: element3: WebElement = driver.find_element( By.XPATH, STEAM_PRICE_DISCOUNTED_ORIGINAL ) price_str: str = element3.text - try: - steam_info.recommended_price_eur = float( - price_str.replace("€", "").replace(",", ".").strip() - ) - except ValueError: - pass - + steam_info.recommended_price_eur = float( + price_str.replace("€", "").replace(",", ".").strip() + ) except WebDriverException: logging.debug( f"No Steam discounted original price found on shop page for {steam_app_id}" ) + except ValueError: + logging.debug( + f"Steam discounted original price has wrong format for {steam_app_id}" + ) if steam_info.recommended_price_eur is None: try: element4: WebElement = driver.find_element(By.XPATH, STEAM_PRICE_FULL) - price2_str: str = element4.text.replace("€", "").strip() + price2_str: str = element4.text if "free" in price2_str.lower(): steam_info.recommended_price_eur = 0 else: - try: - steam_info.recommended_price_eur = float(price2_str) - except ValueError: - pass - + steam_info.recommended_price_eur = float( + price2_str.replace("€", "").replace(",", ".").strip() + ) except WebDriverException: logging.debug(f"No Steam full price found on shop page for {steam_app_id}") + except ValueError: + logging.debug(f"Steam full price has wrong format for {steam_app_id}") if steam_info.recommended_price_eur is None: logging.error(f"No Steam price found for {steam_app_id}") + if steam_info.release_date is None: + try: + element5: WebElement = driver.find_element(By.XPATH, STEAM_RELEASE_DATE) + release_date_str: str = element5.text + release_date_str = release_date_str.split("RELEASE DATE:")[1].strip() + release_date: datetime = datetime.strptime( + release_date_str, "%d %b, %Y" + ).replace(tzinfo=timezone.utc) + steam_info.release_date = release_date + + except WebDriverException: + logging.debug(f"No release date found on shop page for {steam_app_id}") + except (IndexError, ValueError): + logging.debug( + f"Release date in wrong format on shop page for {steam_app_id}" + ) return steam_info diff --git a/app/sqlalchemy.py b/app/sqlalchemy.py index 7b2ea59..ba5ae05 100644 --- a/app/sqlalchemy.py +++ b/app/sqlalchemy.py @@ -7,6 +7,7 @@ from typing import Any, Type from sqlalchemy import ( + JSON, Column, DateTime, Enum, @@ -20,7 +21,7 @@ select, ) from sqlalchemy.engine.interfaces import Dialect -from sqlalchemy.orm import Session, registry, relationship +from sqlalchemy.orm import registry, relationship, scoped_session, sessionmaker from alembic import command from alembic.config import Config as AlembicConfig @@ -72,9 +73,9 @@ class IgdbInfo(Base): __tablename__ = "igdb_info" id: int = Column(Integer, primary_key=True, nullable=False) - url: str | None = Column(String, nullable=False) + url: str = Column(String, nullable=False) - name: str | None = Column(String, nullable=False) + name: str = Column(String, nullable=False) short_description: str | None = Column(String) release_date: datetime | None = Column(AwareDateTime) @@ -104,9 +105,9 @@ class SteamInfo(Base): __tablename__ = "steam_info" id: int = Column(Integer, primary_key=True, nullable=False) - url: str | None = Column(String, nullable=False) + url: str = Column(String, nullable=False) - name: str | None = Column(String, nullable=False) + name: str = Column(String, nullable=False) short_description: str | None = Column(String) release_date: datetime | None = Column(AwareDateTime) genres: str | None = Column(String) @@ -152,8 +153,8 @@ class Offer(Base): title: str = Column(String, nullable=False) probable_game_name: str = Column(String, nullable=False) - seen_first: datetime | None = Column(AwareDateTime) - seen_last: datetime | None = Column(AwareDateTime) + seen_first: datetime = Column(AwareDateTime, nullable=False) + seen_last: datetime = Column(AwareDateTime, nullable=False) valid_from: datetime | None = Column(AwareDateTime) valid_to: datetime | None = Column(AwareDateTime) @@ -182,6 +183,37 @@ def __repr__(self) -> str: ) +class User(Base): + __tablename__ = "users" + + id: int = Column(Integer, primary_key=True, nullable=False) + + registration_date: datetime = Column(AwareDateTime) + offers_received_count: int = Column(Integer, default=0) + + telegram_id: int | None = Column(Integer) + telegram_chat_id: int | None = Column(Integer) + telegram_user_details: str | None = Column(JSON) + + telegram_subscriptions: list[TelegramSubscription] = relationship( + "TelegramSubscription", back_populates="user", cascade="all, delete-orphan" + ) + + +class TelegramSubscription(Base): + __tablename__ = "telegram_subscriptions" + + id: int = Column(Integer, primary_key=True, nullable=False) + + user_id: int = Column(Integer, ForeignKey("users.id"), nullable=False) + user: User = relationship("User", back_populates="telegram_subscriptions") + + source: Source = Column(Enum(Source), nullable=False) + type: OfferType = Column(Enum(OfferType), nullable=False) + + last_offer_id: int = Column(Integer, nullable=False, default=0) + + class LootDatabase: def __init__(self, echo: bool = False) -> None: # Run Alembic migrations first before we open a session @@ -193,8 +225,8 @@ def __init__(self, echo: bool = False) -> None: echo=echo, future=True, ) - - self.session = Session(self.engine) + session_factory = sessionmaker(bind=self.engine) + self.session = scoped_session(session_factory) def __enter__(self) -> LootDatabase: return self diff --git a/app/telegram.py b/app/telegram.py index 1801a9a..09a329f 100644 --- a/app/telegram.py +++ b/app/telegram.py @@ -1,26 +1,57 @@ +from __future__ import annotations + import html import json import logging import traceback +from datetime import datetime, timezone, timedelta +from types import TracebackType +from typing import Type +import humanize import telegram -from telegram import ParseMode, Update +from sqlalchemy import or_, select +from sqlalchemy.orm import Session +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ParseMode, Update from telegram.ext import ( CallbackContext, + CallbackQueryHandler, CommandHandler, Filters, MessageHandler, Updater, ) +from app.common import ( + TIMESTAMP_READABLE_WITH_HOUR, + TIMESTAMP_SHORT, + OfferType, + Source, +) from app.configparser import Config, ParsedConfig +from app.sqlalchemy import Game, Offer, TelegramSubscription, User, and_ logger = logging.getLogger(__name__) class TelegramBot: - def __init__(self, config: ParsedConfig): + def __init__(self, config: ParsedConfig, session: Session): self.config = config + self.session = session + + def __enter__(self) -> TelegramBot: + if self.config.telegram_bot: + self.start() + return self + + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + if self.updater is not None: + self.stop() def start(self) -> None: """Start the bot.""" @@ -30,9 +61,8 @@ def start(self) -> None: [ telegram.BotCommand("start", "Start the bot"), telegram.BotCommand("help", "Show help"), + telegram.BotCommand("manage", "Edit your subscriptions"), telegram.BotCommand("status", "Show your subscriptions"), - telegram.BotCommand("subscribe", "Subscribe to offers"), - telegram.BotCommand("unsubscribe", "Unsubscribe from offers"), ] ) @@ -42,12 +72,23 @@ def start(self) -> None: logging.info("Telegram Bot: Initialized") dispatcher.add_handler(CommandHandler("start", self.start_command)) + dispatcher.add_handler(CommandHandler("leave", self.leave_command)) dispatcher.add_handler(CommandHandler("help", self.help_command)) - dispatcher.add_handler(CommandHandler("status", self.help_command)) - dispatcher.add_handler(CommandHandler("subscribe", self.subscribe_command)) - dispatcher.add_handler(CommandHandler("unsubscribe", self.unsubscribe_command)) + dispatcher.add_handler(CommandHandler("manage", self.manage_command)) + dispatcher.add_handler(CommandHandler("status", self.status_command)) + dispatcher.add_handler(CommandHandler("offers", self.offers_command)) dispatcher.add_handler(CommandHandler("debug", self.debug_command)) - dispatcher.add_handler(CommandHandler("bad_command", self.bad_command)) + + dispatcher.add_handler( + CallbackQueryHandler(self.toggle_subscription_callback, pattern="toggle") + ) + dispatcher.add_handler( + CallbackQueryHandler(self.offer_details_callback, pattern="details") + ) + dispatcher.add_handler( + CallbackQueryHandler(self.close_menu_callback, pattern="close menu") + ) + dispatcher.add_handler(MessageHandler(Filters.command, self.unknown)) dispatcher.add_error_handler(self.error_handler) @@ -60,7 +101,7 @@ def stop(self) -> None: self.updater.stop() def error_handler(self, update: object, context: CallbackContext) -> None: # type: ignore - """Log the error and send a telegram message to notify the developer.""" + """Log the error and send a telegram message to notify the developer chat.""" # Log the error before we do anything else, so we can see it even if something breaks. logger.error(msg="Exception while handling an update:", exc_info=context.error) @@ -78,35 +119,151 @@ def error_handler(self, update: object, context: CallbackContext) -> None: # ty # Build the message with some markup and additional information about what happened. # TODO: Might need to add some logic to deal with messages longer than the 4096 character limit. update_str = update.to_dict() if isinstance(update, Update) else str(update) - message = ( + message_pt1 = ( f"An exception was raised while handling an update:\n\n" f"
    update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
                 "
    \n\n" f"
    context.chat_data = {html.escape(str(context.chat_data))}
    \n\n" f"
    context.user_data = {html.escape(str(context.user_data))}
    \n\n" - f"
    {html.escape(tb_string)}
    " ) - # Finally, send the message - context.bot.send_message( - chat_id=Config.get().telegram_developer_chat_id, - text=message, - parse_mode=ParseMode.HTML, - ) + message_pt2 = f"
    {html.escape(tb_string)}
    " - # Do some more specific error handling here: + logging.error(message_pt1 + message_pt2) + if len(message_pt1) + len(message_pt2) < 4096: + message = message_pt1 + message_pt2 + context.bot.send_message( + chat_id=Config.get().telegram_developer_chat_id, + text=message, + parse_mode=ParseMode.HTML, + ) + else: + if len(message_pt1) < 4096: + # Finally, send the message + context.bot.send_message( + chat_id=Config.get().telegram_developer_chat_id, + text=message_pt1, + parse_mode=ParseMode.HTML, + ) + if len(message_pt2) < 4096: + context.bot.send_message( + chat_id=Config.get().telegram_developer_chat_id, + text=message_pt2, + parse_mode=ParseMode.HTML, + ) + + if len(message_pt1) >= 4096 and len(message_pt2) >= 4096: + context.bot.send_message( + chat_id=Config.get().telegram_developer_chat_id, + text="There was an error, but the message is too long to send.", + parse_mode=ParseMode.HTML, + ) + + # TODO: Do some more specific error handling here: # - If the user blacked our bot, remove him from the database if isinstance(context.error, telegram.TelegramError): if context.error.message == "Unauthorized": pass pass - def bad_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - """Raise an error to trigger the error handler.""" - context.bot.wrong_method_name() # type: ignore[attr-defined] + def manage_command(self, update: Update, context: CallbackContext) -> None: # type: ignore + """Handle the /manage command: Manage subscriptions.""" + if update.message is None or update.effective_user is None: + return + + db_user = self.get_user(update.effective_user.id) + if db_user is None: + update.message.reply_markdown_v2( + "You are not registered. Please, register with /start command." + ) + return + + update.message.reply_text( + self.manage_menu_message(), reply_markup=self.manage_menu_keyboard(db_user) + ) + + def offers_command(self, update: Update, context: CallbackContext) -> None: # type: ignore + """Handle the /offers command: Send all subscriptions once.""" + if update.message is None or update.effective_user is None: + return + + db_user = self.get_user(update.effective_user.id) + if db_user is None: + update.message.reply_markdown_v2( + "You are not registered. Please, register with /start command." + ) + return + + if ( + db_user.telegram_subscriptions is None + or len(db_user.telegram_subscriptions) == 0 + ): + update.message.reply_markdown_v2( + "You have no subscriptions\\. Change that with /manage\\." + ) + return + + if not self.send_new_offers(db_user): + update.message.reply_markdown_v2( + "No new offers available\\. I will write you as soon as there are new offers, I promise\\!" + ) + + def send_new_offers(self, user: User) -> bool: + """Send all new offers for the user.""" + + subscriptions = user.telegram_subscriptions + + offers_sent = 0 + subscription: TelegramSubscription + for subscription in subscriptions: + offers: list[Offer] = ( + self.session.execute( + select(Offer).where( + and_( + Offer.type == subscription.type, + Offer.source == subscription.source, + Offer.id > subscription.last_offer_id, + or_( + Offer.valid_from <= datetime.now().replace(tzinfo=None), # type: ignore + Offer.valid_from == None, # noqa: E711 + ), + or_( + Offer.valid_to >= datetime.now().replace(tzinfo=None), # type: ignore + and_( + Offer.valid_from == None, # noqa: E711 + Offer.seen_first + >= datetime.now().replace(tzinfo=timezone.utc) + - timedelta(days=7), + ), + ), + ) + ) + ) + .scalars() + .all() + ) + + if len(offers) == 0: + continue + + offers_sent += len(offers) + + # Send the offers + for offer in offers: + self.send_offer(offer, user) + + # Update the last offer id + subscription.last_offer_id = offers[-1].id + user.offers_received_count = user.offers_received_count + len(offers) + self.session.commit() + + if offers_sent: + return True + else: + return False def debug_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - """Show some debug information.""" + """Handle the /debug command: Show some debug information.""" if update.message is None: return @@ -125,60 +282,86 @@ def debug_command(self, update: Update, context: CallbackContext) -> None: # ty ) def start_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - """Handle the /start command.""" + """Handle the /start command: Register the user and display guide.""" if update.message is None or update.effective_user is None: return - # TODO: Register user (database) if not registered - # - Date of registration (=now) - # - Telegram user ID - # - Telegram user details - # - Telegram chat ID - # - Number of offers received - # - Total saved EUR + welcome_text = ( + R"I belong to the [LootScraper](https://github\.com/eikowagenknecht/lootscraper) project\. " + R"If you have any issues or feature request, please use the " + R"[issues](https://github\.com/eikowagenknecht/lootscraper/issues) to report them\. " + R"And if you like it, please consider " + R"[⭐ starring it on GitHub](https://github\.com/eikowagenknecht/lootscraper/stargazers)\. " + R"Thanks\!" + "\n\n" + R"*How this works*" + "\n" + R"You tell me what kind of offers you want to see with the /manage command\. " + R"I will then send you a message with all current offers of that kind if you want\. " + R"I will also send you a message every time a new offer is added\. " + R"To see the commands you can use to talk to me, type /help now\." + "\n\n" + R"*Privacy*" + "\n" + R"I need to store some user data \(e\.g\. your Telegram user ID and your subscriptions\) to work\. " + R"You can leave any time by typing /leave\. " + R"This instantly deletes all data about you\. " + R"Also I will be sad to see you go\." + ) + + db_user = self.get_user(update.effective_user.id) + + if db_user is not None: + update.message.reply_markdown_v2( + Rf"Welcome back, {update.effective_user.mention_markdown_v2()} 👋\. " + + R"You are already registered ❤\. " + + R"In case you forgot, this was my initial message to you:" + + "\n\n" + + welcome_text, + ) + return + + # Register user if not registered yet + new_user = User( + telegram_id=update.effective_user.id, + telegram_chat_id=update.effective_chat.id + if update.effective_chat + else None, + telegram_user_details=update.effective_user.to_json(), + registration_date=datetime.now().replace(tzinfo=timezone.utc), + ) + self.session.add(new_user) + self.session.commit() + update.message.reply_markdown_v2( - ( - Rf"Hi {update.effective_user.mention_markdown_v2()}, welcome to the LootScraper Telegram Bot\!" - "\n\n" - R"I belong to the [LootScraper](https://github\.com/eikowagenknecht/lootscraper) project\. " - R"If you have any issues or feature request, please use the " - R"[issues](https://github\.com/eikowagenknecht/lootscraper/issues) to report them\. " - R"And if you like it, please consider " - R"[starring it on GitHub](https://github\.com/eikowagenknecht/lootscraper/stargazers)\. " - R"Thanks\!" - "\n\n" - R"*How this works*" - "\n" - R"You tell me what kind of offers you want to see\. " - R"I will then send you a message with all current offers of that kind\. " - R"I will also send you a message every time a new offer is added\. " - R"To see the commands you can use to talk to me, type /help now\." - "\n\n" - R"*Privacy*" - "\n" - R"I need to store some user data \(e\.g\. your Telegram user ID\ and your subscriptions) to work\. " - R"You can leave any time by typing /leave\. " - R"This instantly deletes all data about you\." - R"Also I will be sad to see you go\." - ), + Rf"Hi {update.effective_user.mention_markdown_v2()} 👋, welcome to the LootScraper Telegram Bot and thank you for registering\!" + + "\n\n" + + welcome_text, ) def leave_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - """Handle the /leave command.""" + """Handle the /leave command: Unregister the user.""" if update.message is None or update.effective_user is None: return - # TODO: Delete user from database (if registered) - update.message.reply_markdown_v2( - ( - Rf"Hi {update.effective_user.mention_markdown_v2()}, I'm sad to see you go\. " - R"Your user data has been deleted\. " - R"If you want to come back at any time, just type /start to start again\!" - ), - ) + db_user = self.get_user(update.effective_user.id) + + if db_user is None: + update.message.reply_markdown_v2( + ( + Rf"Hi {update.effective_user.mention_markdown_v2()}, you are currently not registered\. " + R"So you can't leave ;\-\)" + ), + ) + return + + # Delete user from database (if registered) + self.session.delete(db_user) + self.session.commit() + update.message.reply_markdown_v2( ( - Rf"Hi {update.effective_user.mention_markdown_v2()}, you are currently not registered\. " + Rf"Bye {update.effective_user.mention_markdown_v2()}, I'm sad to see you go\. " R"Your user data has been deleted\. " R"If you want to come back at any time, just type /start to start again\!" ), @@ -188,6 +371,7 @@ def help_command(self, update: Update, context: CallbackContext) -> None: # typ """Handle the /help command: Display all available commands to the user.""" if update.message is None: return + update.message.reply_markdown_v2( ( R"*Available commands*" @@ -198,103 +382,445 @@ def help_command(self, update: Update, context: CallbackContext) -> None: # typ "\n" R"/status \- Show information about your subscriptions" "\n" - R"/subscribe _type_ \- Start receiving offers for _type_" - "\n" - R" \- _amazon game_ \- Free games from Amazon" - "\n" - R" \- _amazon loot_ \- Free loot from Amazon" - "\n" - R" \- _epic_ \- Free games from Epic Games" - "\n" - R" \- _steam_ \- Free games from Steam" + R"/offers \- Send all current offers once \(only from the categories you are subscribed to\)" "\n" - R" \- _gog_ \- Free games from GOG" - "\n" - R" \- _all_ \- All of the above" - "\n" - R"/unsubscribe _type_ \- Stop receiving offers for _type_" + R"/manage \- Manage your subscriptions" "\n" R"/leave \- Leave this bot and delete stored user data" ) ) - def subscribe_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - if not update.effective_chat or not update.message or not update.message.text: + def status_command(self, update: Update, context: CallbackContext) -> None: # type: ignore + """Handle the /status command: Display some statistics about the user.""" + if not update.effective_chat or not update.effective_user or not update.message: return - # TODO: Check if user is registered, otherwise return error message - - subscription_type = ( - update.message.text.lower().removeprefix("/subscribe").strip() - ) - - if subscription_type == "": + db_user = self.get_user(update.effective_user.id) + if db_user is None: update.message.reply_markdown_v2( - R"Sorry, this command needs a type to work\. Type /help to see all available commands\." + Rf"Hi {update.effective_user.mention_markdown_v2()}, you are currently not registered\. " + R"So there is no data stored about you\. " + R"But I'd be happy to see you register any time with the /start command\!" ) - elif subscription_type == "gog": - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not implemented yet\." + return + + subscriptions_text: str + if len(db_user.telegram_subscriptions) > 0: + subscriptions_text = ( + Rf"\- You have {len(db_user.telegram_subscriptions)} subscriptions\. " ) - elif subscription_type == "steam": - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not implemented yet\." + subscriptions_text += ( + R"Here are the categories you are subscribed to: " + "\n" ) - elif subscription_type == "epic": - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not implemented yet\." + for subscription in db_user.telegram_subscriptions: + subscriptions_text += ( + Rf"\* {subscription.source.value} \({subscription.type.value}\)" + + "\n" + ) + subscriptions_text += ( + R"You can unsubscribe from them any time with /manage\." ) - elif subscription_type == "amazon game": - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not implemented yet\." + else: + subscriptions_text = ( + R"\- You are currently not subscribed to any categories\." + R"\- You can change that with the /manage command if you wish\." ) - elif subscription_type == "amazon loot": - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not implemented yet\." + + update.message.reply_markdown_v2( + Rf"Hi {update.effective_user.mention_markdown_v2()}, you are currently registered\. " + R"But I'm not storing much user data, so this is all I know about you: " + "\n\n" + Rf"\- You registered on {markdown_escape(db_user.registration_date.strftime(TIMESTAMP_READABLE_WITH_HOUR))} with the /start command\." + "\n" + Rf"\- Your Telegram chat id is {db_user.telegram_chat_id}\. " + R"Neat, huh? " + R"I use it to send you notifications\." + "\n" + f"{subscriptions_text}" + "\n" + Rf"\- You received {db_user.offers_received_count} offers so far\. " + ) + + def unknown(self, update: Update, context: CallbackContext) -> None: # type: ignore + if not update.effective_chat: + return + + context.bot.send_message( + chat_id=update.effective_chat.id, + text="Sorry, I didn't understand that command. Type /help to see all commands.", + ) + + def get_user(self, telegram_id: int) -> User | None: + db_user = ( + self.session.execute(select(User).where(User.telegram_id == telegram_id)) + .scalars() + .one_or_none() + ) + + return db_user + + def is_subscribed(self, user: User, type: OfferType, source: Source) -> bool: + subscription = self.session.execute( + select(TelegramSubscription).where( + and_( + TelegramSubscription.user_id == user.id, + TelegramSubscription.type == type, + TelegramSubscription.source == source, + ) ) - elif subscription_type == "all": - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not implemented yet\." + ).scalar_one_or_none() + return subscription is not None + + def subscribe(self, user: User, type: OfferType, source: Source) -> None: + self.session.add(TelegramSubscription(user=user, source=source, type=type)) + self.session.commit() + + def unsubscribe(self, user: User, type: OfferType, source: Source) -> None: + self.session.query(TelegramSubscription).filter( + and_( + TelegramSubscription.user_id == user.id, + TelegramSubscription.type == type, + TelegramSubscription.source == source, ) - else: - update.message.reply_markdown_v2( - Rf"Sorry, '{subscription_type}' is not a valid subscription type\. Type /help to see all available commands\." + ).delete() + self.session.commit() + + def manage_menu(self, update: Update, context: CallbackContext) -> None: # type: ignore + if update.callback_query is None or update.effective_user is None: + return + + db_user = self.get_user(update.effective_user.id) + if db_user is None: + update.callback_query.answer( + text="You are not registered. Please, register with /start command." ) - pass + return + + update.callback_query.answer() + update.callback_query.edit_message_text( + text=self.manage_menu_message(), + reply_markup=self.manage_menu_keyboard(db_user), + ) + + def manage_menu_message(self) -> str: + return ( + "Here you can manage your subscriptions. " + "To do so, just click the following buttons to subscribe / unsubscribe. " + ) + + def manage_menu_close_message(self) -> str: + return ( + "Thank you for managing your subscriptions. " + "Forgot something? " + "You can continue any time with /manage. " + "If you want me to send you all current offers of your subscriptions, you can type /offers now or any time later." + ) + + def manage_menu_keyboard(self, user: User) -> InlineKeyboardMarkup: + keyboard: list[list[InlineKeyboardButton]] = [] + + if any( + x.source == Source.AMAZON and x.type == OfferType.GAME + for x in user.telegram_subscriptions + ): + keyboard.append(keyboard_button_row(True, Source.AMAZON, OfferType.GAME)) + else: + keyboard.append(keyboard_button_row(False, Source.AMAZON, OfferType.GAME)) + + if any( + x.source == Source.AMAZON and x.type == OfferType.LOOT + for x in user.telegram_subscriptions + ): + keyboard.append(keyboard_button_row(True, Source.AMAZON, OfferType.LOOT)) + else: + keyboard.append(keyboard_button_row(False, Source.AMAZON, OfferType.LOOT)) + + if any( + x.source == Source.EPIC and x.type == OfferType.GAME + for x in user.telegram_subscriptions + ): + keyboard.append(keyboard_button_row(True, Source.EPIC, OfferType.GAME)) + else: + keyboard.append(keyboard_button_row(False, Source.EPIC, OfferType.GAME)) + + if any( + x.source == Source.GOG and x.type == OfferType.GAME + for x in user.telegram_subscriptions + ): + keyboard.append(keyboard_button_row(True, Source.GOG, OfferType.GAME)) + else: + keyboard.append(keyboard_button_row(False, Source.GOG, OfferType.GAME)) - def unsubscribe_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - if not update.effective_chat or not update.message or not update.message.text: + if any( + x.source == Source.STEAM and x.type == OfferType.GAME + for x in user.telegram_subscriptions + ): + keyboard.append(keyboard_button_row(True, Source.STEAM, OfferType.GAME)) + else: + keyboard.append(keyboard_button_row(False, Source.STEAM, OfferType.GAME)) + + keyboard.append( + [InlineKeyboardButton(text="Close", callback_data="close menu")] + ) + + return InlineKeyboardMarkup(keyboard) + + def offer_details_keyboard(self, offer: Offer) -> InlineKeyboardMarkup: + keyboard: list[list[InlineKeyboardButton]] = [] + keyboard.append( + [ + InlineKeyboardButton( + text="Show details", callback_data=f"details {offer.id}" + ) + ] + ) + return InlineKeyboardMarkup(keyboard) + + def offer_details_callback(self, update: Update, context: CallbackContext) -> None: # type: ignore + if update.callback_query is None or update.effective_user is None: + return + + query = update.callback_query + if query.data is None: return - # TODO: Check if user is registered, otherwise return error message + offer_id = int(query.data.split(" ")[1]) + offer = self.session.execute(select(Offer).where(Offer.id == offer_id)).scalar() - subscription_type = ( - update.message.text.lower().removeprefix("/unsubscribe").strip() + query.answer() + query.edit_message_text( + text=self.offer_details_message(offer), + parse_mode=ParseMode.MARKDOWN_V2, + reply_markup=None, ) - print(subscription_type) - def status_command(self, update: Update, context: CallbackContext) -> None: # type: ignore - if not update.effective_chat: + def close_menu_callback(self, update: Update, context: CallbackContext) -> None: # type: ignore + if update.callback_query is None or update.effective_user is None: return - # TODO: Check if user is registered, then display some stats: - # - Active subscriptions - # - Total saved EUR - # - Number of offers received + query = update.callback_query - if update.effective_chat: - text_caps = " ".join(context.args).upper() # type: ignore - context.bot.send_message(chat_id=update.effective_chat.id, text=text_caps) + if query.data != "close menu": + return - def unknown(self, update: Update, context: CallbackContext) -> None: # type: ignore - if not update.effective_chat: + query.answer(text="Bye!") + query.edit_message_text( + text=self.manage_menu_close_message(), + reply_markup=None, + ) + + def toggle_subscription_callback(self, update: Update, context: CallbackContext) -> None: # type: ignore + query = update.callback_query + if query is None or update.effective_user is None or query.data is None: return - context.bot.send_message( - chat_id=update.effective_chat.id, - text="Sorry, I didn't understand that command. Type /help to see all commands.", + db_user = self.get_user(update.effective_user.id) + if db_user is None: + query.answer( + text="You are not registered. Please, register with /start command." + ) + return + + subscription_type = query.data.lower().removeprefix("toggle").strip() + + answer_text = None + + if subscription_type == "amazon game": + if not self.is_subscribed(db_user, OfferType.GAME, Source.AMAZON): + self.subscribe(db_user, OfferType.GAME, Source.AMAZON) + answer_text = answer(True, Source.AMAZON, OfferType.GAME) + else: + self.unsubscribe(db_user, OfferType.GAME, Source.AMAZON) + answer_text = answer(False, Source.AMAZON, OfferType.GAME) + elif subscription_type == "amazon loot": + if not self.is_subscribed(db_user, OfferType.LOOT, Source.AMAZON): + self.subscribe(db_user, OfferType.LOOT, Source.AMAZON) + answer_text = answer(True, Source.AMAZON, OfferType.LOOT) + else: + self.unsubscribe(db_user, OfferType.LOOT, Source.AMAZON) + answer_text = answer(False, Source.AMAZON, OfferType.LOOT) + elif subscription_type == "epic game": + if not self.is_subscribed(db_user, OfferType.GAME, Source.EPIC): + self.subscribe(db_user, OfferType.GAME, Source.EPIC) + answer_text = answer(True, Source.EPIC, OfferType.GAME) + else: + self.unsubscribe(db_user, OfferType.GAME, Source.EPIC) + answer_text = answer(False, Source.EPIC, OfferType.GAME) + elif subscription_type == "gog game": + if not self.is_subscribed(db_user, OfferType.GAME, Source.GOG): + self.subscribe(db_user, OfferType.GAME, Source.GOG) + answer_text = answer(True, Source.GOG, OfferType.GAME) + else: + self.unsubscribe(db_user, OfferType.GAME, Source.GOG) + answer_text = answer(False, Source.GOG, OfferType.GAME) + elif subscription_type == "steam game": + if not self.is_subscribed(db_user, OfferType.GAME, Source.STEAM): + self.subscribe(db_user, OfferType.GAME, Source.STEAM) + answer_text = answer(True, Source.STEAM, OfferType.GAME) + else: + self.unsubscribe(db_user, OfferType.GAME, Source.STEAM) + answer_text = answer(False, Source.STEAM, OfferType.GAME) + + query.answer(text=answer_text) + query.edit_message_text( + text=self.manage_menu_message(), + reply_markup=self.manage_menu_keyboard(db_user), ) + def send_offer(self, offer: Offer, user: User) -> None: + self.updater.bot.send_message( + chat_id=user.telegram_chat_id, + text=self.offer_message(offer), + parse_mode=ParseMode.MARKDOWN_V2, + reply_markup=self.offer_details_keyboard(offer), + ) + + def offer_message(self, offer: Offer) -> str: + content = Rf"*{offer.source.value} \({offer.type.value}\) \- {markdown_escape(offer.title)}*" + + if offer.img_url: + content += " " + markdown_url(offer.img_url, f"[{offer.id}]") + elif offer.game and offer.game.steam_info and offer.game.steam_info.image_url: + content += " " + markdown_url( + offer.game.steam_info.image_url, f"[{offer.id}]" + ) + else: + content += f" [{offer.id}]" + + if offer.valid_to: + time_to_end = humanize.naturaldelta( + datetime.now().replace(tzinfo=timezone.utc) - offer.valid_to + ) + if datetime.now().replace(tzinfo=timezone.utc) > offer.valid_to: + content += f"\nOffer expired {markdown_escape(time_to_end)} ago" + else: + content += f"\nOffer expires in {markdown_escape(time_to_end)}" + content += f" \\({markdown_escape(offer.valid_to.strftime(TIMESTAMP_READABLE_WITH_HOUR))}\\)\\." + else: + content += "\nOffer is valid forever\\.\\. just kidding, we just don't know when it will end, so grab it now\\!" + + if offer.url: + content += ( + "Claim it now for free on " + + markdown_url(offer.url, offer.source.value) + + R"\!" + ) + + return content + + def offer_details_message(self, offer: Offer) -> str: + content = self.offer_message(offer) + + if offer.game: + game: Game = offer.game + + content += "\n\n__About the game__\n\n" + if game.igdb_info and game.igdb_info.name: + content += Rf"*Name:* {markdown_escape(game.igdb_info.name)}" + "\n\n" + elif game.steam_info and game.steam_info.name: + content += Rf"*Name:* {markdown_escape(game.steam_info.name)}" + "\n\n" + + ratings = [] + if game.steam_info and game.steam_info.metacritic_score: + text = f"Metacritic {game.steam_info.metacritic_score} %" + if game.steam_info.metacritic_url: + text = markdown_url(game.steam_info.metacritic_url, text) + ratings.append(text) + if ( + game.steam_info + and game.steam_info.percent + and game.steam_info.score + and game.steam_info.recommendations + ): + text = Rf"Steam {game.steam_info.percent} % ({game.steam_info.score}/10, {game.steam_info.recommendations} recommendations)" + text = markdown_url(game.steam_info.url, text) + ratings.append(text) + if ( + game.igdb_info + and game.igdb_info.meta_ratings + and game.igdb_info.meta_score + ): + text = Rf"IGDB Meta {game.igdb_info.meta_score} % ({game.igdb_info.meta_ratings} sources)" + text = markdown_url(game.igdb_info.url, text) + ratings.append(text) + if ( + game.igdb_info + and game.igdb_info.user_ratings + and game.igdb_info.user_score + ): + text = Rf"IGDB User {game.igdb_info.user_score} % ({game.igdb_info.user_ratings} sources)" + text = markdown_url(game.igdb_info.url, text) + ratings.append(text) + + if len(ratings) > 0: + ratings_str = f"*Ratings:* {' / '.join(ratings)}\n\n" + content += ratings_str + if game.igdb_info and game.igdb_info.release_date: + content += f"*Release date:* {markdown_escape(game.igdb_info.release_date.strftime(TIMESTAMP_SHORT))}\n\n" + elif game.steam_info and game.steam_info.release_date: + content += f"*Release date:* {markdown_escape(game.steam_info.release_date.strftime(TIMESTAMP_SHORT))}\n\n" + if game.steam_info and game.steam_info.recommended_price_eur: + content += ( + Rf"*Recommended price \(Steam\):* {markdown_escape(str(game.steam_info.recommended_price_eur))} EUR" + + "\n\n" + ) + if game.igdb_info and game.igdb_info.short_description: + content += f"*Description:* {markdown_escape(game.igdb_info.short_description)}\n\n" + elif game.steam_info and game.steam_info.short_description: + content += f"*Description:* {markdown_escape(game.steam_info.short_description)}\n\n" + if game.steam_info and game.steam_info.genres: + content += f"*Genres:* {markdown_escape(game.steam_info.genres)}\n\n" + content += R"\* Any information about the offer is automatically grabbed and may in rare cases not match the correct game\." + + return content + def markdown_json_formatted(input: str) -> str: return f"```json\n{input}\n```" + + +def keyboard_button_row( + active: bool, + source: Source, + offer_type: OfferType, +) -> list[InlineKeyboardButton]: + button_state = " - subscribed" if active else "" + source_str = f"{source.value} ({offer_type.value})" + command = f"toggle {source.name} {offer_type.name}" + + return [InlineKeyboardButton(f"{source_str}{button_state}", callback_data=command)] + + +def answer(new_state: bool, source: Source, offer_type: OfferType) -> str: + source_str = f"{source.value} ({offer_type.value})" + if new_state: + return f"Congratulations! You are now subscribed to {source_str} offers." + else: + return f"You are now unsubscribed from {source_str} offers." + + +def markdown_escape(input: str) -> str: + return ( + input.replace("_", "\\_") + .replace("*", "\\*") + .replace("[", "\\[") + .replace("]", "\\]") + .replace("(", "\\(") + .replace(")", "\\)") + .replace("~", "\\~") + .replace("`", "\\`") + .replace(">", "\\>") + .replace("#", "\\#") + .replace("+", "\\+") + .replace("-", "\\-") + .replace("=", "\\=") + .replace("|", "\\|") + .replace("{", "\\{") + .replace("}", "\\}") + .replace(".", "\\.") + .replace("!", "\\!") + ) + + +def markdown_url(url: str, text: str) -> str: + return Rf"[{markdown_escape(text)}]({markdown_escape(url)})" diff --git a/config.default.ini b/config.default.ini index 637b939..d048b88 100644 --- a/config.default.ini +++ b/config.default.ini @@ -27,11 +27,10 @@ IGDB = yes [actions] ScrapeGames = yes ScrapeLoot = yes -; not used yet ScrapeInfo = yes GenerateFeed = yes UploadFtp = no -Telegram = no +TelegramBot = no [telegram] AccessToken = TOKEN_HERE diff --git a/lootscraper.py b/lootscraper.py index 8dfc946..7c13b9a 100644 --- a/lootscraper.py +++ b/lootscraper.py @@ -10,6 +10,7 @@ from selenium.webdriver.chrome.webdriver import WebDriver from sqlalchemy import select from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import Session from app.common import TIMESTAMP_LONG, OfferType, Source from app.configparser import Config @@ -21,8 +22,8 @@ from app.scraper.loot.epic_games import EpicScraper from app.scraper.loot.gog import GogScraper from app.scraper.loot.steam import SteamScraper -from app.sqlalchemy import Game, LootDatabase, Offer -from sqlalchemy.orm import Session +from app.sqlalchemy import Game, LootDatabase, Offer, SteamInfo, User +from app.telegram import TelegramBot from app.upload import upload_to_server exit = Event() @@ -64,32 +65,42 @@ def main() -> None: logging.getLogger().addHandler(stream_handler) logging.info("Starting script") - # Run the job every hour (or whatever is set in the config file). This is - # not exact because it does not account for the execution time, but that - # doesn't matter in our context. - run = 1 - while not exit.is_set(): - logging.info(f"Starting Run # {run}") - - try: - job() - except OperationalError as oe: - logging.error(f"Database error: {oe}") - logging.error("Database error, exiting applications") - sys.exit() - except Exception as e: - # Something unexpected occurred, log it and continue with the next run as usual - logging.exception(e) - - time_between_runs = int(Config.get().wait_between_runs) - if time_between_runs == 0: - break - next_execution = datetime.now() + timedelta(seconds=time_between_runs) - - logging.info(f"Waiting until {next_execution.isoformat()} for next execution") - - run += 1 - exit.wait(time_between_runs) + with ( + LootDatabase(echo=Config.get().db_echo) as db, + TelegramBot(Config.get(), db.session) as bot, + ): + # Run the job every hour (or whatever is set in the config file). This is + # not exact because it does not account for the execution time, but that + # doesn't matter in our context. + run = 1 + while not exit.is_set(): + logging.info(f"Starting Run # {run}") + + try: + job(db) + if Config.get().telegram_bot: + session: Session = db.session + for user in session.execute(select(User)).scalars().all(): + bot.send_new_offers(user) + except OperationalError as oe: + logging.error(f"Database error: {oe}") + logging.error("Database error, exiting applications") + sys.exit() + except Exception as e: + # Something unexpected occurred, log it and continue with the next run as usual + logging.exception(e) + + time_between_runs = int(Config.get().wait_between_runs) + if time_between_runs == 0: + break + next_execution = datetime.now() + timedelta(seconds=time_between_runs) + + logging.info( + f"Waiting until {next_execution.isoformat()} for next execution" + ) + + run += 1 + exit.wait(time_between_runs) logging.info(f"Exiting script after {run} runs") @@ -99,10 +110,10 @@ def quit(signo: int, _frame: FrameType | None) -> None: exit.set() -def job() -> None: - db: LootDatabase +def job(db: LootDatabase) -> None: webdriver: WebDriver - with (LootDatabase(echo=Config.get().db_echo) as db, get_pagedriver() as webdriver): + with get_pagedriver() as webdriver: + # refresh_all_steam_info(db.session, webdriver) # DEBUG ONLY scraped_offers: dict[str, dict[str, list[Offer]]] = {} cfg_what_to_scrape = { @@ -210,51 +221,24 @@ def job() -> None: for db_offer in loot_offers_in_db[scraper_source][scraper_type]: add_game_info(db_offer, db.session, webdriver) - if Config.get().generate_feed: - feed_file_base = Config.data_path() / Path( - Config.get().feed_file_prefix + ".xml" - ) - # Generate and upload feeds split by source - any_feed_changed = False - for scraper_source in loot_offers_in_db: - for scraper_type in loot_offers_in_db[scraper_source]: - feed_changed = False - feed_file = Config.data_path() / Path( - Config.get().feed_file_prefix - + f"_{Source[scraper_source].name.lower()}" - + f"_{OfferType[scraper_type].name.lower()}" - + ".xml" - ) - old_hash = hash_file(feed_file) - generate_feed( - offers=loot_offers_in_db[scraper_source][scraper_type], - feed_file_base=feed_file_base, - author_name=Config.get().feed_author_name, - author_web=Config.get().feed_author_web, - author_mail=Config.get().feed_author_mail, - feed_url_prefix=Config.get().feed_url_prefix, - feed_url_alternate=Config.get().feed_url_alternate, - feed_id_prefix=Config.get().feed_id_prefix, - source=Source[scraper_source], - type=OfferType[scraper_type], - ) - new_hash = hash_file(feed_file) - if old_hash != new_hash: - feed_changed = True - any_feed_changed = True - - if feed_changed and Config.get().upload_feed: - upload_to_server(feed_file) - - # Generate and upload cumulated feed - all_offers = [] - for scraper_source in loot_offers_in_db: - for scraper_type in loot_offers_in_db[scraper_source]: - all_offers.extend(loot_offers_in_db[scraper_source][scraper_type]) - - if any_feed_changed: + if Config.get().generate_feed: + feed_file_base = Config.data_path() / Path( + Config.get().feed_file_prefix + ".xml" + ) + # Generate and upload feeds split by source + any_feed_changed = False + for scraper_source in loot_offers_in_db: + for scraper_type in loot_offers_in_db[scraper_source]: + feed_changed = False + feed_file = Config.data_path() / Path( + Config.get().feed_file_prefix + + f"_{Source[scraper_source].name.lower()}" + + f"_{OfferType[scraper_type].name.lower()}" + + ".xml" + ) + old_hash = hash_file(feed_file) generate_feed( - offers=all_offers, + offers=loot_offers_in_db[scraper_source][scraper_type], feed_file_base=feed_file_base, author_name=Config.get().feed_author_name, author_web=Config.get().feed_author_web, @@ -262,14 +246,66 @@ def job() -> None: feed_url_prefix=Config.get().feed_url_prefix, feed_url_alternate=Config.get().feed_url_alternate, feed_id_prefix=Config.get().feed_id_prefix, + source=Source[scraper_source], + type=OfferType[scraper_type], ) - if Config.get().upload_feed: - upload_to_server(feed_file_base) - else: - logging.info("Skipping upload, disabled") - - else: - logging.info("Skipping feed generation, disabled") + new_hash = hash_file(feed_file) + if old_hash != new_hash: + feed_changed = True + any_feed_changed = True + + if feed_changed and Config.get().upload_feed: + upload_to_server(feed_file) + + # Generate and upload cumulated feed + all_offers = [] + for scraper_source in loot_offers_in_db: + for scraper_type in loot_offers_in_db[scraper_source]: + all_offers.extend(loot_offers_in_db[scraper_source][scraper_type]) + + if any_feed_changed: + generate_feed( + offers=all_offers, + feed_file_base=feed_file_base, + author_name=Config.get().feed_author_name, + author_web=Config.get().feed_author_web, + author_mail=Config.get().feed_author_mail, + feed_url_prefix=Config.get().feed_url_prefix, + feed_url_alternate=Config.get().feed_url_alternate, + feed_id_prefix=Config.get().feed_id_prefix, + ) + if Config.get().upload_feed: + upload_to_server(feed_file_base) + else: + logging.info("Skipping upload, disabled") + + else: + logging.info("Skipping feed generation, disabled") + + db.session.commit() + + +def refresh_all_steam_info(session: Session, webdriver: WebDriver) -> None: + """ + Refresh Steam information for all games in the database + """ + logging.info("Refreshing Steam information") + steam_info: SteamInfo + for steam_info in session.query(SteamInfo): + new_steam_info = get_steam_details(id=steam_info.id, driver=webdriver) + if new_steam_info is None: + return + steam_info.name = new_steam_info.name + steam_info.short_description = new_steam_info.short_description + steam_info.release_date = new_steam_info.release_date + steam_info.publishers = new_steam_info.publishers + steam_info.image_url = new_steam_info.image_url + steam_info.recommendations = new_steam_info.recommendations + steam_info.percent = new_steam_info.percent + steam_info.score = new_steam_info.score + steam_info.metacritic_score = new_steam_info.metacritic_score + steam_info.metacritic_url = new_steam_info.metacritic_url + steam_info.recommended_price_eur = new_steam_info.recommended_price_eur def add_game_info(offer: Offer, session: Session, webdriver: WebDriver) -> None: diff --git a/requirements.txt b/requirements.txt index d73a1a2..3e11410 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/tests/test_test.py b/tests/test_test.py index 0295ca7..3e2f46a 100644 --- a/tests/test_test.py +++ b/tests/test_test.py @@ -4,17 +4,16 @@ from time import sleep from selenium.webdriver.chrome.webdriver import WebDriver +from sqlalchemy import select +from sqlalchemy.orm import Session from app.common import TIMESTAMP_LONG from app.configparser import Config from app.pagedriver import get_pagedriver from app.scraper.info.igdb import get_igdb_id -from app.scraper.info.steam import ( - get_steam_id, - get_steam_details, -) +from app.scraper.info.steam import get_steam_details, get_steam_id from app.scraper.info.utils import get_match_score -from app.sqlalchemy import LootDatabase +from app.sqlalchemy import LootDatabase, Offer, User from app.telegram import TelegramBot logging.basicConfig( @@ -26,21 +25,54 @@ class VariousTests(unittest.TestCase): def test_entity_framework(self) -> None: - with LootDatabase() as db: + with LootDatabase(echo=True) as db: db.initialize_or_update() - res = db.read_all() - print(res) - pass + self.assertTrue(True) def test_telegram(self) -> None: - # Arrange - bot = TelegramBot(Config.get()) - # Act - bot.start() - sleep(1) - bot.stop() - # Assert - self.assertEqual(1, 1) + with LootDatabase(echo=True) as db: + # Arrange + # Act + with TelegramBot(Config.get(), db.session): + sleep(10000) + # Assert + + def test_telegram_messagesend(self) -> None: + with ( + LootDatabase(echo=True) as db, + TelegramBot(Config.get(), db.session) as bot, + ): + # Arrange + session: Session = db.session + + # Act + offer: Offer = session.execute(select(Offer)).scalars().first() + user: User = ( + session.execute(select(User).where(User.telegram_id == 724039662)) + .scalars() + .first() + ) + bot.send_offer(offer, user) + + # Assert + + def test_telegram_new_offers(self) -> None: + with ( + LootDatabase(echo=True) as db, + TelegramBot(Config.get(), db.session) as bot, + ): + # Arrange + session: Session = db.session + + # Act + user: User = ( + session.execute(select(User).where(User.telegram_id == 724039662)) + .scalars() + .first() + ) + bot.send_new_offers(user) + + # Assert def test_similarity(self) -> None: search = "Rainbow Six Siege" @@ -84,67 +116,94 @@ def test_igdb_id_resolution_with_special_chars(self) -> None: def test_steam_appinfo(self) -> None: driver: WebDriver with get_pagedriver() as driver: - game = get_steam_details(driver, title="Counter-Strike") - self.assertIsNotNone(game) - self.assertEquals(game.name, "Counter-Strike") - self.assertIsNotNone(game.short_description) + steam_info = get_steam_details(driver, title="Counter-Strike") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "Counter-Strike") + self.assertIsNotNone(steam_info.short_description) self.assertEquals( - game.release_date.isoformat(), "2000-11-01T00:00:00+00:00" + steam_info.release_date.isoformat(), "2000-11-01T00:00:00+00:00" ) - self.assertEquals(game.recommended_price_eur, 8.19) - self.assertEquals(game.genres, "Action") + self.assertEquals(steam_info.recommended_price_eur, 8.19) + self.assertEquals(steam_info.genres, "Action") - self.assertGreater(game.steam_recommendations, 100000) - self.assertEquals(game.steam_percent, 96) - self.assertEquals(game.steam_score, 10) - self.assertEquals(game.metacritic_score, 88) + self.assertGreater(steam_info.recommendations, 100000) + self.assertEquals(steam_info.percent, 96) + self.assertEquals(steam_info.score, 10) + self.assertEquals(steam_info.metacritic_score, 88) self.assertEquals( - game.metacritic_url, + steam_info.metacritic_url, """https://www.metacritic.com/game/pc/counter-strike?ftag=MCD-06-10aaa1f""", ) def test_steam_appinfo2(self) -> None: driver: WebDriver with get_pagedriver() as driver: - game = get_steam_details(driver, title="Rainbow Six Siege") - self.assertIsNotNone(game) - self.assertEquals(game.name, "Tom Clancy's Rainbow Six® Siege") - self.assertIsNotNone(game.short_description) + steam_info = get_steam_details(driver, title="Rainbow Six Siege") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "Tom Clancy's Rainbow Six® Siege") + self.assertIsNotNone(steam_info.short_description) self.assertEquals( - game.release_date.isoformat(), "2015-12-01T00:00:00+00:00" + steam_info.release_date.isoformat(), "2015-12-01T00:00:00+00:00" ) - self.assertEquals(game.recommended_price_eur, 19.99) - self.assertEquals(game.genres, "Action") + self.assertEquals(steam_info.recommended_price_eur, 19.99) + self.assertEquals(steam_info.genres, "Action") - self.assertGreater(game.steam_recommendations, 850000) - self.assertEquals(game.steam_percent, 87) - self.assertEquals(game.steam_score, 9) - self.assertEquals(game.metacritic_score, None) - self.assertEquals(game.metacritic_url, None) + self.assertGreater(steam_info.recommendations, 850000) + self.assertEquals(steam_info.percent, 87) + self.assertEquals(steam_info.score, 9) + self.assertEquals(steam_info.metacritic_score, None) + self.assertEquals(steam_info.metacritic_url, None) + + def test_steam_appinfo_releasedate(self) -> None: + driver: WebDriver + with get_pagedriver() as driver: + steam_info = get_steam_details(driver, title="Guild Wars 2") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "Guild Wars 2") + self.assertIsNotNone(steam_info.release_date) + self.assertEquals( + steam_info.release_date.isoformat(), "2012-08-28T00:00:00+00:00" + ) + + def test_steam_appinfo_recommendations(self) -> None: + driver: WebDriver + with get_pagedriver() as driver: + steam_info = get_steam_details(driver, title="Riverbond") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "Riverbond") + self.assertIsNotNone(steam_info.recommendations) # This is a weird one where the price is shown in "KWR" in the JSON, so the # store page has to be used instead to get the price in EUR def test_steam_appinfo_price(self) -> None: driver: WebDriver with get_pagedriver() as driver: - game = get_steam_details(driver, title="Cities: Skylines") - self.assertIsNotNone(game) - self.assertEquals(game.name, "Cities: Skylines") - self.assertEquals(game.recommended_price_eur, 27.99) + steam_info = get_steam_details(driver, title="Cities: Skylines") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "Cities: Skylines") + self.assertEquals(steam_info.recommended_price_eur, 27.99) + + def test_steam_appinfo_language(self) -> None: + driver: WebDriver + with get_pagedriver() as driver: + steam_info = get_steam_details(driver, title="Warframe") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "Warframe") + self.assertEquals(steam_info.short_description[0:6], "Awaken") def test_steam_appinfo_ageverify(self) -> None: driver: WebDriver with get_pagedriver() as driver: - game = get_steam_details(driver, title="Doom Eternal") - self.assertIsNotNone(game) - self.assertEquals(game.name, "DOOM Eternal") - self.assertEquals(game.steam_score, 9) + steam_info = get_steam_details(driver, title="Doom Eternal") + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.name, "DOOM Eternal") + self.assertEquals(steam_info.score, 9) def test_steam_json_multiple_genres(self) -> None: with get_pagedriver() as driver: - game = get_steam_details(driver, id=1424910) - self.assertIsNotNone(game) - self.assertEquals(game.genres, "Action, Indie, Racing, Early Access") + steam_info = get_steam_details(driver, id=1424910) + self.assertIsNotNone(steam_info) + self.assertEquals(steam_info.genres, "Action, Indie, Racing, Early Access") def test_igdb_id(self) -> None: id = get_igdb_id("Cities: Skylines")