From fecc896a0b401b1f89f43018216f6a3f89a4ab6e Mon Sep 17 00:00:00 2001 From: unixtreme Date: Thu, 2 Nov 2023 18:22:22 +0900 Subject: [PATCH] chg: chg: Use pydantic to manage config schemas. #69 --- .gitignore | 1 + modules/config.py | 320 ------------------------------- modules/config/__init__.py | 143 ++++++++++++++ modules/config/schemas_v1.py | 198 +++++++++++++++++++ modules/console.py | 10 +- modules/context.py | 22 ++- modules/discord.py | 57 +++--- modules/encounter.py | 26 +-- modules/exceptions.py | 74 +++++++ modules/gui/__init__.py | 14 +- modules/gui/emulator_controls.py | 8 +- modules/http.py | 5 +- modules/main.py | 8 +- modules/modes/__init__.py | 3 + modules/modes/starters.py | 47 +++-- modules/obs.py | 8 +- modules/profiles.py | 89 +++------ modules/stats.py | 9 +- pokebot.py | 32 +++- profiles/catch_block.yml | 7 - profiles/cheats.yml | 14 -- profiles/customhooks.py | 99 +++++----- profiles/discord.yml | 54 ------ profiles/general.yml | 6 - profiles/keys.yml | 61 ------ profiles/logging.yml | 24 --- profiles/obs.yml | 21 -- requirements.py | 1 + 28 files changed, 628 insertions(+), 733 deletions(-) delete mode 100644 modules/config.py create mode 100644 modules/config/__init__.py create mode 100644 modules/config/schemas_v1.py create mode 100644 modules/exceptions.py create mode 100644 modules/modes/__init__.py delete mode 100644 profiles/catch_block.yml delete mode 100644 profiles/cheats.yml delete mode 100644 profiles/discord.yml delete mode 100644 profiles/general.yml delete mode 100644 profiles/keys.yml delete mode 100644 profiles/logging.yml delete mode 100644 profiles/obs.yml diff --git a/.gitignore b/.gitignore index 39aebb69..818700b0 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ __pycache__ .last-requirements-check build/ profiles/*/ +profiles/*.yml dist/ logs/ mgba/ diff --git a/modules/config.py b/modules/config.py deleted file mode 100644 index 0478faf2..00000000 --- a/modules/config.py +++ /dev/null @@ -1,320 +0,0 @@ -import sys -from pathlib import Path - -from jsonschema import validate -from ruamel.yaml import YAML - -from modules.console import console - -yaml = YAML() - -available_bot_modes = ["Manual", "Spin", "Starters", "Fishing", "Bunny Hop", "Rayquaza"] - -general_schema = f""" -type: object -properties: - starter: - type: string - enum: - - Treecko - - Torchic - - Mudkip - - Bulbasaur - - Charmander - - Squirtle - - Chikorita - - Totodile - - Cyndaquil -""" - -logging_schema = """ - log_encounters: - type: boolean - console: - type: object - properties: - encounter_data: - type: string - enum: - - verbose - - basic - - disable - encounter_ivs: - type: string - enum: - - verbose - - basic - - disable - encounter_moves: - type: string - enum: - - verbose - - basic - - disable - statistics: - type: string - enum: - - verbose - - basic - - disable - save_pk3: - type: object - properties: - all: - type: boolean - shiny: - type: boolean - custom: - type: boolean - import_pk3: - type: boolean -""" - -discord_schema = """ -type: object -properties: - rich_presence: - type: boolean - iv_format: - type: string - enum: - - basic - - formatted - bot_id: - type: string - shiny_pokemon_encounter: - type: object - properties: - enable: - type: boolean - ping_mode: - enum: - - ~ - - user - - role - pokemon_encounter_milestones: - type: object - properties: - enable: - type: boolean - interval: - type: integer - minimum: 0 - ping_mode: - enum: - - ~ - - user - - role - total_encounter_milestones: - type: object - properties: - enable: - type: boolean - interval: - type: integer - minimum: 0 - ping_mode: - enum: - - ~ - - user - - role - phase_summary: - type: object - properties: - enable: - type: boolean - first_interval: - type: integer - minimum: 0 - consequent_interval: - type: integer - minimum: 0 - ping_mode: - enum: - - ~ - - user - - role - anti_shiny_pokemon_encounter: - type: object - properties: - enable: - type: boolean - ping_mode: - enum: - - ~ - - user - - role -""" - -obs_schema = """ -type: object -properties: - obs_websocket: - type: object - properties: - host: - type: string - port: - type: integer - password: - type: string - shiny_delay: - type: integer - minimum: 0 - discord_delay: - type: integer - minimum: 0 - screenshot: - type: boolean - replay_buffer: - type: boolean - replay_buffer_delay: - type: integer - minimum: 0 - replay_dir: - type: string - http_server: - type: object - properties: - enable: - type: boolean - ip: - type: string - port: - type: integer -""" - -cheats_schema = """ -type: object -properties: - starters: - type: boolean - starters_rng: - type: boolean -""" - -catch_block_schema = """ -type: object -properties: - block_list: - type: array -""" - -keys_schema = """ -type: object -properties: - gba: - type: object - properties: - Up: {type: string} - Down: {type: string} - Left: {type: string} - Right: {type: string} - A: {type: string} - B: {type: string} - L: {type: string} - R: {type: string} - Start: {type: string} - Select: {type: string} - - emulator: - type: object - properties: - zoom_in: {type: string} - zoom_out: {type: string} - toggle_manual: {type: string} - toggle_video: {type: string} - toggle_audio: {type: string} - set_speed_1x: {type: string} - set_speed_2x: {type: string} - set_speed_3x: {type: string} - set_speed_4x: {type: string} - toggle_unthrottled: {type: string} - reset: {type: string} - exit: {type: string} - save_state: {type: string} - toggle_stepping_mode: {type: string} -""" - -schemas = { - "general": general_schema, - "logging": logging_schema, - "discord": discord_schema, - "obs": obs_schema, - "cheats": cheats_schema, -} - -config = {"general": {}, "logging": {}, "discord": {}, "obs": {}, "cheats": {}} - -# Keeps a list of all configuration directories that should be searched whenever we are looking -# for a particular config file. -# In practice, this will contain the global `profiles/` directory, and the profile-specific config -# directory (`profiles//config/`) once a profile has been selected by the user. -config_dir_stack: list[Path] = [] - - -def load_config(file_name: str, schema: str) -> dict: - """ - Looks for and loads a single config file and returns its parsed contents. - - If the config file cannot be found, it stops the bot. - - :param file_name: File name (without path) of the config file - :param schema: JSON Schema string to validate the configuration dict against - :return: Parsed and validated contents of the configuration file - """ - result = None - for config_dir in config_dir_stack: - file_path = config_dir / file_name - if file_path.is_file(): - result = load_config_file(file_path, schema) - - if result is None: - console.print(f"[bold red]Could not find any config file named {file_name}.[/]") - sys.exit(1) - - return result - - -def load_config_file(file_path: Path, schema: str) -> dict: - """ - Loads and validates a single config file. This requires an exact path and therefore will not - fall back to the global config directory if the file could not be found. - - It will stop the bot if the file does not exist or contains invalid data. - - :param file_path: Path to the config file - :param schema: JSON Schema string to validate the configuration dict against - :return: Parsed and validated contents of the configuration file - """ - try: - with open(file_path, mode="r", encoding="utf-8") as f: - config = yaml.load(f) - validate(config, yaml.load(schema)) - return config - except: - console.print(f"[bold red]Config file {str(file_path)} is invalid![/]") - sys.exit(1) - - -def load_config_from_directory(path: Path, allow_missing_files=False) -> None: - """ - Loads all the 'default' configuration files into the `config` variable that can be accessed by other modules. - - :param path: Path to the config directory. - :param allow_missing_files: If this is False, the function will stop the bot if it cannot find a config file. - This should be used when loading the global configuration directory, but not when - loading the profile-specific config directory (so that we use the profile-specific - config if it exists, but keep using the global one if it doesn't.) - """ - global config_dir_stack, config - - config_dir_stack.append(path) - - for key in config: - file_path = path / (key + ".yml") - if file_path.is_file(): - config[key] = load_config_file(file_path, schemas[key]) - elif not allow_missing_files: - console.print(f"[bold red]Expected a config file {str(file_path)} could not be found.[/]") - sys.exit(1) diff --git a/modules/config/__init__.py b/modules/config/__init__.py new file mode 100644 index 00000000..c8a13088 --- /dev/null +++ b/modules/config/__init__.py @@ -0,0 +1,143 @@ +"""Module for managing and accessing configuration.""" + +from pathlib import Path + +from confz import BaseConfig, FileSource +from ruamel.yaml import YAML + +from modules import exceptions +from modules.modes import available_bot_modes +from modules.runtime import get_base_path +from modules.config.schemas_v1 import CatchBlock, Cheats, Discord, General, Keys, Logging, OBS, ProfileMetadata + +# Defines which class attributes of the Config class are meant to hold required configuration data. +CONFIG_ATTRS = { + 'catch_block', + 'cheats', + 'discord', + 'general', + 'keys', + 'logging', + 'obs', +} + + +class Config: + """Initializes a config directory and provides access to the different settings.""" + + available_bot_modes = available_bot_modes + + def __init__(self, config_dir: str | Path | None = None, is_profile: bool = False, strict: bool = False) -> None: + """Initialize the configuration folder, loading all config files. + + :param config_dir: Config directory to load during initialization. + :param is_profile: Whether profile files are expected in this directory. + :param strict: Whether to allow files to be missing. + """ + self.config_dir = get_base_path() / 'profiles' if not config_dir else Path(config_dir) + self.catch_block: CatchBlock = CatchBlock() + self.cheats: Cheats = Cheats() + self.discord: Discord = Discord() + self.general: General = General() + self.is_profile = is_profile + self.keys: Keys = Keys() + self.loaded = False + self.logging: Logging = Logging() + self.metadata: ProfileMetadata | None = None + self.obs: OBS = OBS() + self.load(strict=strict) + + def load(self, config_dir: str | Path | None = None, strict: bool = True): + """Load the configuration files in the config_dir. + + :param config_dir: New config dir to load. + :param strict: Whether all files must be present in the directory. + """ + if config_dir: + self.config_dir = config_dir + + for attr in CONFIG_ATTRS: + self.reload_file(attr, strict=strict) + if self.is_profile: + file_path = self.config_dir / ProfileMetadata.filename + self.metadata = load_config_file(file_path, ProfileMetadata, strict=True) + self.loaded = True + + def save(self, config_dir: str | Path | None = None, strict: bool = True): + """Saves currently loaded configuration into files inside config_dir. + + :param config_dir: New config dir to save to. + :param strict: Whether to allow overwriting files or creating missing directories. + """ + if config_dir: + self.config_dir = config_dir + + for attr in CONFIG_ATTRS: + self.save_file(attr, strict=strict) + if self.is_profile: + self.save_file('metadata', strict=strict) + + def reload_file(self, attr: str, strict: bool = False) -> None: + """Reload a specific configuration file, using the same source. + + :param attr: The instance attribute that holds the config file to load. + :param strict: Whether all files must be present in the directory. + """ + + config_inst = getattr(self, attr, None) + if not isinstance(config_inst, BaseConfig): + raise exceptions.PrettyValueError(f'Config.{attr} is not a valid configuration to load.') + file_path = self.config_dir / config_inst.filename + config_inst = load_config_file(file_path, config_inst.__class__, strict=strict) + if config_inst: + setattr(self, attr, config_inst) + + def save_file(self, attr: str, strict: bool = False) -> None: + """Save a specific configuration file, using the same source. + + :param attr: The instance attribute that holds the config file to save. + :param strict: Whether all files must be present in the directory. + """ + + config_inst = getattr(self, attr, None) + if not isinstance(config_inst, BaseConfig): + raise exceptions.PrettyValueError(f'Config.{attr} is not a valid configuration to save.') + save_config_file(self.config_dir, config_inst, strict=strict) + + +def load_config_file(file_path: Path, config_cls: type[BaseConfig], strict: bool = False) -> BaseConfig | None: + """Helper to load files from a path without manually creating the sources. + + :param file_path: The path to the file to load. + :param config_cls: Class to instance from the specified path. + :param strict: Whether to raise an exception if the file is missing. + """ + if not file_path.is_file(): + if strict: + raise exceptions.CriticalFileMissing(file_path) + config_inst = None + else: + sources = [FileSource(file_path)] + config_inst = config_cls(config_sources=sources) + return config_inst + + +def save_config_file(config_dir: Path, config_inst: BaseConfig, strict: bool = False) -> None: + """Helper to save config data from a model into a config directory. + + :param config_dir: The directory to store the file into. + :param config_inst: Config instance to save. + :param strict: Whether to allow overwriting files or creating missing directories. + """ + if not config_dir.is_dir(): + if strict: + raise exceptions.CriticalDirectoryMissing(config_dir) + config_dir.mkdir() + if not isinstance(config_inst, BaseConfig): + raise exceptions.PrettyValueError(f'The provided config is not a valid config instance.') + config_file = config_dir / config_inst.filename + if strict and config_file.is_file(): + raise exceptions.PrettyValueError(f'The file {config_file} already exists. Refusing to overwrite it.') + yaml = YAML() + yaml.allow_unicode = False + yaml.dump(config_inst.model_dump(), config_dir / config_inst.filename) diff --git a/modules/config/schemas_v1.py b/modules/config/schemas_v1.py new file mode 100644 index 00000000..0035a3f2 --- /dev/null +++ b/modules/config/schemas_v1.py @@ -0,0 +1,198 @@ +"""Contains default schemas for configuration files.""" + +from __future__ import annotations + +from enum import Enum +from pathlib import Path +from typing import Literal + +from confz import BaseConfig, FileSource +from pydantic import field_validator, Field +from pydantic.types import Annotated, ClassVar, NonNegativeInt, PositiveInt + + +class Starters(Enum): + TREECKO = "Treecko" + TORCHIC = "Torchic" + MUDKIP = "Mudkip" + BULBASAUR = "Bulbasaur" + CHARMANDER = "Charmander" + SQUIRTLE = "Squirtle" + CHIKORITA = "Chikorita" + TOTODILE = "Totodile" + CYNDAQUIL = "Cyndaquil" + + +class CatchBlock(BaseConfig): + """Schema for the catch_block configuration.""" + + filename: ClassVar = 'catch_block.yml' + block_list: list[str] = [] + + +class Cheats(BaseConfig): + """Schema for the cheats configuration.""" + + filename: ClassVar = 'cheats.yml' + starters: bool = False + starters_rng: bool = False + + +class Discord(BaseConfig): + """Schema for the discord configuration.""" + + filename: ClassVar = 'discord.yml' + rich_presence: bool = True + iv_format: Literal["basic", "formatted"] = 'formatted' + bot_id: str = 'PokéBot' + global_webhook_url: str = '' + shiny_pokemon_encounter: DiscordWebhook = Field(default_factory=lambda: DiscordWebhook()) + pokemon_encounter_milestones: DiscordWebhook = Field(default_factory=lambda: DiscordWebhook(interval=10000)) + shiny_pokemon_encounter_milestones: DiscordWebhook = Field(default_factory=lambda: DiscordWebhook(interval=5)) + total_encounter_milestones: DiscordWebhook = Field(default_factory=lambda: DiscordWebhook(interval=25000)) + phase_summary: DiscordWebhook = Field(default_factory=lambda: DiscordWebhook()) + anti_shiny_pokemon_encounter: DiscordWebhook = Field(default_factory=lambda: DiscordWebhook()) + + +class DiscordWebhook(BaseConfig): + """Schema for the different webhooks sections contained in the Discord config.""" + + enable: bool = False + first_interval: PositiveInt | None = 8192 # Only used by phase_summary. + consequent_interval: PositiveInt | None = 5000 # Only used by phase_summary. + interval: PositiveInt = 5 + ping_mode: Literal['user', 'role', None] = None + ping_id: str | None = None + + +class General(BaseConfig): + """Schema for the general configuration.""" + + filename: ClassVar = 'general.yml' + starter: Starters = Starters.MUDKIP + + +class Keys(BaseConfig): + """Schema for the keys configuration.""" + + filename: ClassVar = 'keys.yml' + gba: KeysGBA = Field(default_factory=lambda: KeysGBA()) + emulator: KeysEmulator = Field(default_factory=lambda: KeysEmulator()) + + +class KeysEmulator(BaseConfig): + """Schema for the emulator keys section in the Keys config.""" + + zoom_in: str = 'plus' + zoom_out: str = 'minus' + toggle_manual: str = 'Tab' + toggle_video: str = 'v' + toggle_audio: str = 'b' + set_speed_1x: str = '1' + set_speed_2x: str = '2' + set_speed_3x: str = '3' + set_speed_4x: str = '4' + set_speed_unthrottled: str = '0' + reset: str = 'Ctrl+R' + exit: str = 'Ctrl+Q' + save_state: str = 'Ctrl+S' + toggle_stepping_mode: str = 'Ctrl+L' + + +class KeysGBA(BaseConfig): + """Schema for the GBA keys section in the Keys config.""" + + Up: str = 'Up' + Down: str = 'Down' + Left: str = 'Left' + Right: str = 'Right' + A: str = 'x' + B: str = 'z' + L: str = 'a' + R: str = 's' + Start: str = 'Return' + Select: str = 'BackSpace' + + +class Logging(BaseConfig): + """Schema for the logging configuration.""" + + filename: ClassVar = 'logging.yml' + console: LoggingConsole = Field(default_factory=lambda: LoggingConsole()) + save_pk3: LoggingSavePK3 = Field(default_factory=lambda: LoggingSavePK3()) + import_pk3: bool = False + log_encounters: bool = False + + +class LoggingConsole(BaseConfig): + """Schema for the console section in the Logging config.""" + + encounter_data: Literal["verbose", "basic", "disable"] = 'verbose' + encounter_ivs: Literal["verbose", "basic", "disable"] = 'verbose' + encounter_moves: Literal["verbose", "basic", "disable"] = 'disable' + statistics: Literal["verbose", "basic", "disable"] = 'verbose' + + +class LoggingSavePK3(BaseConfig): + """Schema for the save_pk3 section in the Logging config.""" + + all: bool = False + shiny: bool = False + custom: bool = False + + +class OBS(BaseConfig): + """Schema for the OBS configuration.""" + + filename: ClassVar = 'obs.yml' + discord_delay: NonNegativeInt = 0 + discord_webhook_url: str | None = None + replay_dir: Path = "./stream/replays/" + replay_buffer: bool = False + replay_buffer_delay: NonNegativeInt = 0 + screenshot: bool = False + shiny_delay: NonNegativeInt = 0 + obs_websocket: OBSWebsocket = Field(default_factory=lambda: OBSWebsocket()) + http_server: OBSHTTPServer = Field(default_factory=lambda: OBSHTTPServer()) + + @field_validator('replay_dir') + def validate_dir(cls, value: str | Path, **kwargs) -> Path: + """Ensure the replay_dir field returns a path.""" + if isinstance(value, str): + value = Path(value) + if not isinstance(value, Path): + raise ValueError(f"Expected a Path or a string, got: {type(value)}.") + return value + + +class OBSWebsocket(BaseConfig): + """Schema for the obs_websocket section in the OBS config.""" + + host: str = '127.0.0.1' + password: str = 'password' + port: Annotated[int, Field(gt=1000, lt=65536)] = 4455 + + +class OBSHTTPServer(BaseConfig): + """Schema for the http_server section in the OBS config.""" + + enable: bool = False + ip: str = '127.0.0.1' + port: Annotated[int, Field(gt=1000, lt=65536)] = 8888 + + +class ProfileMetadata(BaseConfig): + """Schema for the metadata configuration file part of profiles.""" + + filename: ClassVar = 'metadata.yml' + version: PositiveInt = 1 + rom: ProfileMetadataROM = Field(default_factory=lambda: ProfileMetadataROM()) + + +class ProfileMetadataROM(BaseConfig): + """Schema for the rom section of the metadata config.""" + + file_name: str = '' + game_code: str = '' + revision: NonNegativeInt = 0 + language: Literal['E', 'F', 'D', 'I', 'J', 'S'] = '' diff --git a/modules/console.py b/modules/console.py index 7af76778..316311c5 100644 --- a/modules/console.py +++ b/modules/console.py @@ -2,6 +2,7 @@ from rich.table import Table from rich.theme import Theme +from modules.context import context from modules.pokemon import Pokemon theme = Theme( @@ -60,14 +61,13 @@ def sv_colour(value: int) -> str: def print_stats(total_stats: dict, pokemon: Pokemon, session_pokemon: list, encounter_rate: int) -> None: - from modules.config import config type_colour = pokemon.species.types[0].name.lower() rich_name = f"[{type_colour}]{pokemon.species.name}[/]" console.print("\n") console.rule(f"{rich_name} encountered at {pokemon.location_met}", style=type_colour) - match config["logging"]["console"]["encounter_data"]: + match context.active_config.logging.console.encounter_data: case "verbose": pokemon_table = Table() pokemon_table.add_column("PID", justify="center", width=10) @@ -99,7 +99,7 @@ def print_stats(total_stats: dict, pokemon: Pokemon, session_pokemon: list, enco f"Shiny Value: {pokemon.shiny_value:,}" ) - match config["logging"]["console"]["encounter_ivs"]: + match context.active_config.logging.console.encounter_ivs: case "verbose": iv_table = Table(title=f"{pokemon.species.name} IVs") iv_table.add_column("HP", justify="center", style=iv_colour(pokemon.ivs.hp)) @@ -130,7 +130,7 @@ def print_stats(total_stats: dict, pokemon: Pokemon, session_pokemon: list, enco f"Sum: [{iv_sum_colour(pokemon.ivs.sum())}]{pokemon.ivs.sum()}[/]" ) - match config["logging"]["console"]["encounter_moves"]: + match context.active_config.logging.console.encounter_moves: case "verbose": move_table = Table(title=f"{pokemon.species.name} Moves") move_table.add_column("Name", justify="left", width=20) @@ -167,7 +167,7 @@ def print_stats(total_stats: dict, pokemon: Pokemon, session_pokemon: list, enco f"PP: {learned_move.pp}" ) - match config["logging"]["console"]["statistics"]: + match context.active_config.logging.console.statistics: case "verbose": stats_table = Table(title="Statistics") stats_table.add_column("", justify="left", width=10) diff --git a/modules/context.py b/modules/context.py index 62352ef2..db90e87f 100644 --- a/modules/context.py +++ b/modules/context.py @@ -6,9 +6,13 @@ from modules.profiles import Profile from modules.roms import ROM +from modules.config import Config + class BotContext: def __init__(self, initial_bot_mode: str = 'Manual'): + self.active_config = Config() + self.emulator: Optional["LibmgbaEmulator"] = None self.gui: Optional["PokebotGui"] = None self.profile: Optional["Profile"] = None @@ -19,6 +23,22 @@ def __init__(self, initial_bot_mode: str = 'Manual'): self._current_bot_mode: str = initial_bot_mode self._previous_bot_mode: str = 'Manual' + def reload_config(self): + """Triggers a config reload, reload the global config then specific profile config.""" + try: + new_config = Config() + new_config.load(self.active_config.config_dir, strict=False) + self.active_config = new_config + message = 'Profile settings loaded.' + except Exception as error: + if self.debug: + raise error + message = ( + 'The configuration could not be loaded, no changes have been made.\n' + 'This is Probably due to a malformed file. For more information run the bot with the --debug flag.' + ) + self.message = message + @property def message(self) -> str: return self._current_message @@ -113,4 +133,4 @@ def _update_gui(self) -> None: self.gui.on_settings_updated() -context = BotContext() +context: BotContext = BotContext() diff --git a/modules/discord.py b/modules/discord.py index acef35a4..72821bd7 100644 --- a/modules/discord.py +++ b/modules/discord.py @@ -2,7 +2,6 @@ from pathlib import Path from pypresence import Presence from discord_webhook import DiscordWebhook, DiscordEmbed -from modules.config import config from modules.context import context @@ -19,42 +18,42 @@ def discord_message( embed_footer: str = None, embed_color: str = "FFFFFF", ) -> None: - if not webhook_url: - webhook_url = config["discord"]["global_webhook_url"] - webhook, embed_obj = DiscordWebhook(url=webhook_url, content=content), None + webhook_url = webhook_url or context.active_config.discord.global_webhook_url + if webhook_url: + webhook, embed_obj = DiscordWebhook(url=webhook_url, content=content), None - if image: - with open(image, "rb") as f: - webhook.add_file(file=f.read(), filename="image.png") + if image: + with open(image, "rb") as f: + webhook.add_file(file=f.read(), filename="image.png") - if embed: - embed_obj = DiscordEmbed(title=embed_title, color=embed_color) + if embed: + embed_obj = DiscordEmbed(title=embed_title, color=embed_color) - if embed_description: - embed_obj.description = embed_description + if embed_description: + embed_obj.description = embed_description - if embed_fields: - for key, value in embed_fields.items(): - embed_obj.add_embed_field(name=key, value=value, inline=False) + if embed_fields: + for key, value in embed_fields.items(): + embed_obj.add_embed_field(name=key, value=value, inline=False) - if embed_thumbnail: - with open(embed_thumbnail, "rb") as f: - webhook.add_file(file=f.read(), filename="thumb.png") - embed_obj.set_thumbnail(url="attachment://thumb.png") + if embed_thumbnail: + with open(embed_thumbnail, "rb") as f: + webhook.add_file(file=f.read(), filename="thumb.png") + embed_obj.set_thumbnail(url="attachment://thumb.png") - if embed_image: - with open(embed_image, "rb") as f: - webhook.add_file(file=f.read(), filename="embed.png") - embed_obj.set_image(url="attachment://embed.png") + if embed_image: + with open(embed_image, "rb") as f: + webhook.add_file(file=f.read(), filename="embed.png") + embed_obj.set_image(url="attachment://embed.png") - if embed_footer: - embed_obj.set_footer(text=embed_footer) + if embed_footer: + embed_obj.set_footer(text=embed_footer) - embed_obj.set_timestamp() - webhook.add_embed(embed_obj) + embed_obj.set_timestamp() + webhook.add_embed(embed_obj) - time.sleep(config["obs"]["discord_delay"]) - webhook.execute() + time.sleep(context.active_config.obs.discord_delay) + webhook.execute() def discord_rich_presence() -> None: @@ -86,7 +85,7 @@ def discord_rich_presence() -> None: RPC.update( state=f"{location} | {context.rom.game_name}", details=( - f'{totals["totals"].get("encounters", 0):,} ({totals["totals"].get("shiny_encounters", 0):,}✨) |' + f'{totals.get("totals", {}).get("encounters", 0):,} ({totals.get("totals", {}).get("shiny_encounters", 0):,}✨) |' f" {total_stats.get_encounter_rate():,}/h" ), large_image=large_image, diff --git a/modules/encounter.py b/modules/encounter.py index 20a2d45a..7d58ec86 100644 --- a/modules/encounter.py +++ b/modules/encounter.py @@ -1,4 +1,3 @@ -from modules.config import config from modules.console import console from modules.context import context from modules.files import save_pk3 @@ -7,8 +6,6 @@ from modules.pokemon import Pokemon from modules.stats import total_stats -block_list: list = [] - def encounter_pokemon(pokemon: Pokemon) -> None: """ @@ -18,26 +15,19 @@ def encounter_pokemon(pokemon: Pokemon) -> None: :return: """ - global block_list - - if config["logging"]["save_pk3"]["all"]: + config = context.active_config + if config.logging.save_pk3.all: save_pk3(pokemon) - if pokemon.is_shiny or block_list == []: - # Load catch block config file - allows for editing while bot is running - from modules.config import catch_block_schema, load_config - - config_catch_block = load_config("catch_block.yml", catch_block_schema) - block_list = config_catch_block["block_list"] - - total_stats.log_encounter(pokemon, block_list) + total_stats.log_encounter(pokemon, config.catch_block.block_list) context.message = f"Encountered a {pokemon.species.name} with a shiny value of {pokemon.shiny_value:,}!" # TODO temporary until auto-catch is ready custom_found = total_stats.custom_catch_filters(pokemon) if pokemon.is_shiny or custom_found: if pokemon.is_shiny: - if not config["logging"]["save_pk3"]["all"] and config["logging"]["save_pk3"]["shiny"]: + config.reload_file('catch_block') + if not config.logging.save_pk3.all and config.logging.save_pk3.shiny: save_pk3(pokemon) state_tag = "shiny" console.print("[bold yellow]Shiny found!") @@ -47,7 +37,7 @@ def encounter_pokemon(pokemon: Pokemon) -> None: alert_message = f"Found a shiny {pokemon.species.name}. 🥳" elif custom_found: - if not config["logging"]["save_pk3"]["all"] and config["logging"]["save_pk3"]["custom"]: + if not config.logging.save_pk3.all and config.logging.save_pk3.custom: save_pk3(pokemon) state_tag = "customfilter" console.print("[bold green]Custom filter Pokemon found!") @@ -60,7 +50,7 @@ def encounter_pokemon(pokemon: Pokemon) -> None: alert_title = None alert_message = None - if not custom_found and pokemon.species.name in block_list: + if not custom_found and pokemon.species.name in config.catch_block.block_list: console.print(f"[bold yellow]{pokemon.species.name} is on the catch block list, skipping encounter...") else: filename_suffix = f"{state_tag}_{pokemon.species.safe_name}" @@ -68,7 +58,7 @@ def encounter_pokemon(pokemon: Pokemon) -> None: # TEMPORARY until auto-battle/auto-catch is done # if the mon is saved and imported, no need to catch it by hand - if config["logging"]["import_pk3"]: + if config.logging.import_pk3: if import_into_storage(pokemon.data): return diff --git a/modules/exceptions.py b/modules/exceptions.py new file mode 100644 index 00000000..6ba2919b --- /dev/null +++ b/modules/exceptions.py @@ -0,0 +1,74 @@ +"""Custom exception handlers.""" + +from __future__ import annotations + +import sys + +from modules.console import console +from modules.context import context + + +class PrettyException(Exception): + """Base class for all exceptions with rich print methods.""" + + exit_code: int | None = 1 + message_template: str = '{}' + message_color = '[bold red]' + recommendation: str = '' + recommendation_color = '[bold yellow]' + + def bare_message(self) -> PrettyException: + """Create an exception to raise without pretty formatting.""" + message = self.message_template.format(self.args) + message = f'{message}\n{self.recommendation}' + return PrettyException(message) + + +class PrettyValueError(PrettyException): + """Exception to print a rich message whenever a ValueError would be raised.""" + + +class CriticalDirectoryMissing(PrettyException): + """Exception for whenever a core file is missing.""" + + message_template = 'Could not load {}, the directory does not exist or is not readable.' + recommendation = 'Make sure the directory exists and the user has read access.' + + +class CriticalFileMissing(PrettyException): + """Exception for whenever a core file is missing.""" + + message_template = 'Could not load {}, file does not exist.' + recommendation = 'Please re-download the program or restore the missing file.' + + +class InvalidConfigData(PrettyException): + """Exception for whenever config file validation fails.""" + + message_template = 'Config file {} is invalid!' + recommendation = 'Please re-download the program or restore/amend the file contents.' + + +def exception_hook(exc_type: type[Exception], exc_instance: Exception, traceback) -> None: + """General handler for exceptions to remove tracebacks and highlight messages if debug is off. + + :param exc_type: Base Exception type, kept for parity with the overridden hook. + :param exc_instance: Instanced exception object being raised. + :param traceback: Traceback object, kept for parity with the overridden hook. + """ + if not isinstance(exc_instance, PrettyException): + raise exc_instance + if context.debug: + raise exc_instance.bare_message() + message = exc_instance.message_template.format(*exc_instance.args) + message = f'{exc_instance.message_color}{message}[/]' + if exc_instance.recommendation: + recommendation = f'{exc_instance.recommendation_color}{exc_instance.recommendation}[/]' + message = f'{message}\n{recommendation}' + console.print(message) + exit_code = exc_instance.exit_code + if exit_code is not None: + sys.exit(exit_code) + + +sys.excepthook = exception_hook diff --git a/modules/gui/__init__.py b/modules/gui/__init__.py index 4f22101e..60cb95ad 100644 --- a/modules/gui/__init__.py +++ b/modules/gui/__init__.py @@ -6,7 +6,6 @@ import PIL.Image import PIL.ImageTk -from modules.config import load_config, keys_schema from modules.console import console from modules.context import context from modules.game import set_rom @@ -45,13 +44,13 @@ def __init__(self, main_loop: callable, on_exit: callable): background=[("!active", "green"), ("active", "darkgreen"), ("pressed", "green")], ) - key_config = load_config("keys.yml", keys_schema) + key_config = context.active_config.keys self._gba_keys: dict[str, int] = {} - for key in input_map: - self._gba_keys[key_config["gba"][key].lower()] = input_map[key] - self._emulator_keys: dict[str, str] = {} - for action in key_config["emulator"]: - self._emulator_keys[key_config["emulator"][action].lower()] = action + for key, val in dict(key_config.gba).items(): + self._gba_keys[val.lower()] = input_map[key] + self._emulator_keys: dict = {} + for action, value in dict(key_config.emulator).items(): + self._emulator_keys[value.lower()] = action self._create_profile_screen = CreateProfileScreen( self.window, self._enable_select_profile_screen, self._run_profile @@ -129,6 +128,7 @@ def _enable_select_profile_screen(self) -> None: def _run_profile(self, profile: "Profile") -> None: self._reset_screen() context.profile = profile + context.active_config.load(profile.path, strict=False) set_rom(profile.rom) context.emulator = LibmgbaEmulator(profile, self._emulator_screen.update) diff --git a/modules/gui/emulator_controls.py b/modules/gui/emulator_controls.py index 3a44150c..a78eb084 100644 --- a/modules/gui/emulator_controls.py +++ b/modules/gui/emulator_controls.py @@ -2,7 +2,6 @@ from tkinter import Tk, ttk from typing import Union -from modules.config import available_bot_modes from modules.context import context from modules.libmgba import LibmgbaEmulator from modules.version import pokebot_name, pokebot_version @@ -57,7 +56,7 @@ def update(self) -> None: return if self.bot_mode_combobox.get() != context.bot_mode: - self.bot_mode_combobox.current(available_bot_modes.index(context.bot_mode)) + self.bot_mode_combobox.current(context.active_config.available_bot_modes.index(context.bot_mode)) self.last_known_bot_mode = context.bot_mode self._set_button_colour(self.speed_1x_button, active_condition=context.emulation_speed == 1) @@ -90,7 +89,7 @@ def handle_bot_mode_selection(event) -> None: context.bot_mode = new_bot_mode ttk.Label(group, text="Bot Mode:", justify="left").grid(row=0, sticky="W") - self.bot_mode_combobox = ttk.Combobox(group, values=available_bot_modes, width=16, state="readonly") + self.bot_mode_combobox = ttk.Combobox(group, values=context.active_config.available_bot_modes, width=16, state="readonly") self.bot_mode_combobox.bind("<>", handle_bot_mode_selection) self.bot_mode_combobox.bind("", lambda e: self.window.focus()) self.bot_mode_combobox.grid(row=1, sticky="W", padx=0) @@ -127,9 +126,12 @@ def _add_settings_controls(self, row: int, column: int): button_settings = {"width": 6, "padding": (0, 3), "cursor": "hand2"} self.toggle_video_button = ttk.Button(group, text="Video", **button_settings, command=context.toggle_video) self.toggle_audio_button = ttk.Button(group, text="Audio", **button_settings, command=context.toggle_audio) + button_settings['width'] = 14 + self.reload_config = ttk.Button(group, text="Reload Config", **button_settings, command=context.reload_config) self.toggle_video_button.grid(row=1, column=0) self.toggle_audio_button.grid(row=1, column=1) + self.reload_config.grid(row=2, columnspan=2) def _add_message_area(self, row: int, column: int, columnspan: int = 1): group = ttk.LabelFrame(self.frame, text="Message:", padding=(10, 5)) diff --git a/modules/http.py b/modules/http.py index 4b7e2d30..78dfc9b0 100644 --- a/modules/http.py +++ b/modules/http.py @@ -1,7 +1,6 @@ from flask_cors import CORS from flask import Flask, jsonify, request -from modules.config import config from modules.context import context from modules.items import get_items from modules.pokemon import get_party @@ -110,6 +109,6 @@ def http_get_routes(): server.run( debug=False, threaded=True, - host=config["obs"]["http_server"]["ip"], - port=config["obs"]["http_server"]["port"], + host=context.active_config.obs.http_server.ip, + port=context.active_config.obs.http_server.port, ) diff --git a/modules/main.py b/modules/main.py index 1fc78da9..6cb95bec 100644 --- a/modules/main.py +++ b/modules/main.py @@ -1,7 +1,6 @@ import sys from threading import Thread -from modules.config import config, load_config_from_directory from modules.console import console from modules.context import context from modules.memory import get_game_state, GameState @@ -17,14 +16,15 @@ def main_loop() -> None: try: mode = None - load_config_from_directory(context.profile.path, allow_missing_files=True) - if config["discord"]["rich_presence"]: + config = context.active_config + + if config.discord.rich_presence: from modules.discord import discord_rich_presence Thread(target=discord_rich_presence).start() - if config["obs"]["http_server"]["enable"]: + if config.obs.http_server.enable: from modules.http import http_server Thread(target=http_server).start() diff --git a/modules/modes/__init__.py b/modules/modes/__init__.py new file mode 100644 index 00000000..f68c4795 --- /dev/null +++ b/modules/modes/__init__.py @@ -0,0 +1,3 @@ +"""Contains modes of operation for the bot.""" + +available_bot_modes = ["Manual", "Spin", "Starters", "Fishing", "Bunny Hop", "Rayquaza"] diff --git a/modules/modes/starters.py b/modules/modes/starters.py index c2818535..6c1e4301 100644 --- a/modules/modes/starters.py +++ b/modules/modes/starters.py @@ -1,7 +1,6 @@ import random from enum import Enum -from modules.config import config from modules.console import console from modules.context import context from modules.encounter import encounter_pokemon @@ -11,6 +10,8 @@ from modules.pokemon import get_party, opponent_changed from modules.trainer import trainer +config = context.active_config + class Regions(Enum): KANTO_STARTERS = 0 @@ -53,13 +54,13 @@ def __init__(self) -> None: self.johto_starters: list = ["Chikorita", "Totodile", "Cyndaquil"] self.hoenn_starters: list = ["Treecko", "Torchic", "Mudkip"] - if config["general"]["starter"] in self.kanto_starters and context.rom.game_title in [ + if config.general.starter.value in self.kanto_starters and context.rom.game_title in [ "POKEMON LEAF", "POKEMON FIRE", ]: self.region: Regions = Regions.KANTO_STARTERS - elif config["general"]["starter"] in self.johto_starters and context.rom.game_title == "POKEMON EMER": + elif config.general.starter.value in self.johto_starters and context.rom.game_title == "POKEMON EMER": self.region: Regions = Regions.JOHTO_STARTERS self.start_party_length: int = 0 console.print( @@ -70,8 +71,8 @@ def __init__(self) -> None: if len(get_party()) == 6: self.update_state(ModeStarterStates.PARTY_FULL) - elif config["general"]["starter"] in self.hoenn_starters: - self.bag_position: int = BagPositions[config["general"]["starter"].upper()].value + elif config.general.starter.value in self.hoenn_starters: + self.bag_position: int = BagPositions[config.general.starter.value.upper()].value if context.rom.game_title == "POKEMON EMER": self.region = Regions.HOENN_STARTERS self.task_bag_cursor: str = "TASK_HANDLESTARTERCHOOSEINPUT" @@ -89,8 +90,8 @@ def __init__(self) -> None: else: self.state = ModeStarterStates.INCOMPATIBLE - if not config["cheats"]["starters_rng"]: - self.rng_history: list = get_rng_state_history(config["general"]["starter"]) + if not config.cheats.starters_rng: + self.rng_history: list = get_rng_state_history(config.general.starter.value) def update_state(self, state: ModeStarterStates): self.state: ModeStarterStates = state @@ -98,7 +99,7 @@ def update_state(self, state: ModeStarterStates): def step(self): if self.state == ModeStarterStates.INCOMPATIBLE: message = ( - f"Starter `{config['general']['starter']}` is incompatible, update `starter` in config " + f"Starter `{config.general.starter.value}` is incompatible, update `starter` in config " f"file `general.yml` to a valid starter for {context.rom.game_name} and restart the bot!" ) console.print(f"[red bold]{message}") @@ -125,7 +126,7 @@ def step(self): continue case ModeStarterStates.RNG_CHECK: - if config["cheats"]["starters_rng"]: + if config.cheats.starters_rng: self.update_state(ModeStarterStates.OVERWORLD) else: rng = unpack_uint32(read_symbol("gRngValue")) @@ -133,7 +134,7 @@ def step(self): pass else: self.rng_history.append(rng) - save_rng_state_history(config["general"]["starter"], self.rng_history) + save_rng_state_history(config.general.starter.value, self.rng_history) self.update_state(ModeStarterStates.OVERWORLD) continue @@ -146,7 +147,7 @@ def step(self): continue case ModeStarterStates.INJECT_RNG: - if config["cheats"]["starters_rng"]: + if config.cheats.starters_rng: write_symbol("gRngValue", pack_uint32(random.randint(0, 2**32 - 1))) self.update_state(ModeStarterStates.SELECT_STARTER) @@ -167,7 +168,7 @@ def step(self): continue case ModeStarterStates.EXIT_MENUS: - if not config["cheats"]["starters"]: + if not config.cheats.starters: if trainer.get_facing_direction() != "Down": context.emulator.press_button("B") context.emulator.hold_button("Down") @@ -227,7 +228,7 @@ def step(self): continue case ModeStarterStates.INJECT_RNG: - if config["cheats"]["starters_rng"]: + if config.cheats.starters_rng: write_symbol("gRngValue", pack_uint32(random.randint(0, 2**32 - 1))) self.update_state(ModeStarterStates.YES_NO) @@ -240,7 +241,7 @@ def step(self): continue case ModeStarterStates.RNG_CHECK: - if config["cheats"]["starters_rng"]: + if config.cheats.starters_rng: self.update_state(ModeStarterStates.CONFIRM_STARTER) else: rng = unpack_uint32(read_symbol("gRngValue")) @@ -248,7 +249,7 @@ def step(self): pass else: self.rng_history.append(rng) - save_rng_state_history(config["general"]["starter"], self.rng_history) + save_rng_state_history(config.general.starter.value, self.rng_history) self.update_state(ModeStarterStates.CONFIRM_STARTER) continue @@ -260,7 +261,7 @@ def step(self): continue case ModeStarterStates.EXIT_MENUS: - if config["cheats"]["starters"]: + if config.cheats.starters: self.update_state(ModeStarterStates.CHECK_STARTER) continue else: @@ -273,10 +274,8 @@ def step(self): continue case ModeStarterStates.CHECK_STARTER: - config["cheats"]["starters"] = True # TODO temporary until menu navigation is ready - if config["cheats"][ - "starters" - ]: # TODO check Pokémon summary screen once menu navigation merged + config.cheats.starters = True # TODO temporary until menu navigation is ready + if config.cheats.starters: # TODO check Pokémon summary screen once menu navigation merged self.update_state(ModeStarterStates.LOG_STARTER) continue @@ -310,7 +309,7 @@ def step(self): continue case ModeStarterStates.INJECT_RNG: - if config["cheats"]["starters_rng"]: + if config.cheats.starters_rng: write_symbol("gRngValue", pack_uint32(random.randint(0, 2**32 - 1))) self.update_state(ModeStarterStates.BAG_MENU) @@ -336,7 +335,7 @@ def step(self): continue case ModeStarterStates.RNG_CHECK: - if config["cheats"]["starters_rng"]: + if config.cheats.starters_rng: self.update_state(ModeStarterStates.CONFIRM_STARTER) else: rng = unpack_uint32(read_symbol("gRngValue")) @@ -344,12 +343,12 @@ def step(self): pass else: self.rng_history.append(rng) - save_rng_state_history(config["general"]["starter"], self.rng_history) + save_rng_state_history(config.general.starter.value, self.rng_history) self.update_state(ModeStarterStates.CONFIRM_STARTER) continue case ModeStarterStates.CONFIRM_STARTER: - if config["cheats"]["starters"]: + if config.cheats.starters: if len(get_party()) > 0: self.update_state(ModeStarterStates.LOG_STARTER) context.emulator.press_button("A") diff --git a/modules/obs.py b/modules/obs.py index 919cc9af..e8d3aeda 100644 --- a/modules/obs.py +++ b/modules/obs.py @@ -1,15 +1,15 @@ import obsws_python as obs -from modules.config import config +from modules.context import context def obs_hot_key( obs_key: str, pressCtrl: bool = False, pressShift: bool = False, pressAlt: bool = False, pressCmd: bool = False ): with obs.ReqClient( - host=config["obs"]["obs_websocket"]["host"], - port=config["obs"]["obs_websocket"]["port"], - password=config["obs"]["obs_websocket"]["password"], + host=context.active_config.obs.obs_websocket.host, + port=context.active_config.obs.obs_websocket.port, + password=context.active_config.obs.obs_websocket.password, timeout=5, ) as client: client.trigger_hot_key_by_key_sequence( diff --git a/modules/profiles.py b/modules/profiles.py index ad505ae6..958bc51d 100644 --- a/modules/profiles.py +++ b/modules/profiles.py @@ -3,42 +3,15 @@ from datetime import datetime from pathlib import Path -import jsonschema -from ruamel.yaml import YAML - +from modules import exceptions +from modules.config import load_config_file, save_config_file +from modules.config.schemas_v1 import ProfileMetadata, ProfileMetadataROM from modules.console import console from modules.roms import ROMS_DIRECTORY, ROM, list_available_roms, load_rom_data from modules.runtime import get_base_path PROFILES_DIRECTORY = get_base_path() / "profiles" -metadata_schema = """ -type: object -properties: - version: - type: integer - enum: - - 1 - rom: - type: object - properties: - file_name: - type: string - game_code: - type: string - revision: - type: integer - language: - type: string - enum: - - E - - F - - D - - I - - J - - S -""" - @dataclass class Profile: @@ -65,6 +38,8 @@ def list_available_profiles() -> list[Profile]: profiles = [] for entry in PROFILES_DIRECTORY.iterdir(): + if entry.name.startswith('_'): + continue try: profiles.append(load_profile(entry)) except RuntimeError: @@ -77,43 +52,32 @@ def load_profile_by_name(name: str) -> Profile: return load_profile(PROFILES_DIRECTORY / name) -def load_profile(path) -> Profile: +def load_profile(path: Path) -> Profile: if not path.is_dir(): raise RuntimeError("Path is not a valid profile directory.") - - metadata_file = path / "metadata.yml" - if not metadata_file.is_file(): - raise RuntimeError("Path is not a valid profile directory.") - - try: - metadata = YAML().load(metadata_file) - jsonschema.validate(metadata, YAML().load(metadata_schema)) - except: - console.print(f'[bold red]Metadata file for profile "{path.name}" is invalid![/]') - sys.exit(1) - + metadata = load_config_file(path / ProfileMetadata.filename, ProfileMetadata, strict=True) current_state = path / "current_state.ss1" if current_state.exists(): last_played = datetime.fromtimestamp(current_state.stat().st_mtime) else: last_played = None - rom_file = ROMS_DIRECTORY / metadata["rom"]["file_name"] + rom_file = ROMS_DIRECTORY / metadata.rom.file_name if rom_file.is_file(): rom = load_rom_data(rom_file) return Profile(rom, path, last_played) else: for rom in list_available_roms(): - if ( - rom.game_code == metadata["rom"]["game_code"] - and rom.revision == metadata["rom"]["revision"] - and rom.language == metadata["rom"]["language"] - ): + if all([ + rom.game_code == metadata.rom.game_code, + rom.revision == metadata.rom.revision, + rom.language == metadata.rom.language, + ]): return Profile(rom, path, last_played) console.print( - f"[bold red]Could not find ROM `{metadata['rom']['file_name']}` for profile `{path.name}`, " - f"please place `{metadata['rom']['file_name']}` into `{ROMS_DIRECTORY}`!" + f"[bold red]Could not find ROM `{metadata.rom.file_name}` for profile `{path.name}`, " + f"please place `{metadata.rom.file_name}` into `{ROMS_DIRECTORY}`!" ) sys.exit(1) @@ -123,24 +87,19 @@ def profile_directory_exists(name: str) -> bool: def create_profile(name: str, rom: ROM) -> Profile: + if name.startswith('_'): + raise exceptions.PrettyValueError(f'Profile names cannot start with the underscore "_" character.') profile_directory = PROFILES_DIRECTORY / name if profile_directory.exists(): raise RuntimeError(f'There already is a profile called "{name}", cannot create a new one with that name.') - profile_directory.mkdir() - yaml = YAML() - yaml.allow_unicode = False - yaml.dump( - { - "version": 1, - "rom": { - "file_name": rom.file.name, - "game_code": rom.game_code, - "revision": rom.revision, - "language": str(rom.language), - }, - }, - profile_directory / "metadata.yml", + rom_cfg = ProfileMetadataROM( + file_name=rom.file.name, + game_code=rom.game_code, + revision=rom.revision, + language=str(rom.language), ) + profile_metadata = ProfileMetadata(rom=rom_cfg) + save_config_file(profile_directory, profile_metadata, strict=False) return Profile(rom, profile_directory, None) diff --git a/modules/stats.py b/modules/stats.py index 4b733c17..08c14cc0 100644 --- a/modules/stats.py +++ b/modules/stats.py @@ -7,8 +7,7 @@ from threading import Thread from datetime import datetime -from modules.config import config -from modules.console import console, print_stats +from modules.console import print_stats from modules.context import context from modules.csv import log_encounter_to_csv from modules.files import read_file, write_file @@ -315,7 +314,7 @@ def log_encounter(self, pokemon: Pokemon, block_list: list) -> None: self.update_sv_records(pokemon) self.update_iv_records(pokemon) - if config["logging"]["log_encounters"]: + if context.active_config.logging.log_encounters: log_encounter_to_csv(self.total_stats, pokemon.to_dict(), self.stats_dir_path) self.update_shiny_averages(pokemon) @@ -328,10 +327,10 @@ def log_encounter(self, pokemon: Pokemon, block_list: list) -> None: self.update_shiny_incremental_stats(pokemon) # TODO fix all this OBS crap - for i in range(config["obs"].get("shiny_delay", 1)): + for i in range(context.active_config.obs.get("shiny_delay", 1)): context.emulator.run_single_frame() # TODO bad (needs to be refactored so main loop advances frame) - if config["obs"]["screenshot"]: + if context.active_config.obs.screenshot: from modules.obs import obs_hot_key while get_game_state() != GameState.BATTLE: diff --git a/pokebot.py b/pokebot.py index 8ad1ac26..a20351b3 100644 --- a/pokebot.py +++ b/pokebot.py @@ -2,10 +2,13 @@ import argparse import atexit +import pathlib import platform from dataclasses import dataclass -from modules.runtime import is_bundled_app, get_base_path +from modules import exceptions # Import base module to ensure the custom exception hook is applied. +from modules.modes import available_bot_modes +from modules.runtime import is_bundled_app from modules.version import pokebot_name, pokebot_version OS_NAME = platform.system() @@ -43,6 +46,18 @@ class StartupSettings: no_audio: bool emulation_speed: int always_on_top: bool + config_path: str + + +def directory_arg(value: str) -> pathlib.Path: + """Determine if the value is a valid readable directory. + + :param value: Directory to verify. + """ + path_obj = pathlib.Path(value) + if not path_obj.is_dir() or not path_obj.exists(): + raise exceptions.CriticalDirectoryMissing(value) + return path_obj def parse_arguments() -> StartupSettings: @@ -53,19 +68,20 @@ def parse_arguments() -> StartupSettings: nargs="?", help="Profile to initialize. Otherwise, the profile selection menu will appear.", ) - parser.add_argument("-m", "--bot-mode", choices=available_bot_modes, help="Initial bot mode (default: Manual)") + parser.add_argument("-m", "--bot-mode", choices=available_bot_modes, help="Initial bot mode (default: Manual).") parser.add_argument( "-s", "--emulation-speed", choices=["0", "1", "2", "3", "4"], help="Initial emulation speed (0 for unthrottled; default: 1)", ) - parser.add_argument("-nv", "--no-video", action="store_true", help="Turn off video output by default") - parser.add_argument("-na", "--no-audio", action="store_true", help="Turn off audio output by default") + parser.add_argument("-nv", "--no-video", action="store_true", help="Turn off video output by default.") + parser.add_argument("-na", "--no-audio", action="store_true", help="Turn off audio output by default.") parser.add_argument( - "-t", "--always-on-top", action="store_true", help="Keep the bot window always on top of other windows" + "-t", "--always-on-top", action="store_true", help="Keep the bot window always on top of other windows." ) - parser.add_argument("-d", "--debug", action="store_true", help="Enable extra debug options and a debug menu") + parser.add_argument("-d", "--debug", action="store_true", help="Enable extra debug options and a debug menu.") + parser.add_argument("-c", "--config", type=directory_arg, dest="config_path", help=argparse.SUPPRESS) args = parser.parse_args() preselected_profile: Profile | None = None @@ -80,6 +96,7 @@ def parse_arguments() -> StartupSettings: no_audio=bool(args.no_audio), emulation_speed=int(args.emulation_speed or "1"), always_on_top=bool(args.always_on_top), + config_path=args.config_path, ) @@ -89,15 +106,12 @@ def parse_arguments() -> StartupSettings: check_requirements() - from modules.config import load_config_from_directory, available_bot_modes from modules.context import context from modules.console import console from modules.gui import PokebotGui from modules.main import main_loop from modules.profiles import Profile, profile_directory_exists, load_profile_by_name - load_config_from_directory(get_base_path() / "profiles") - # This catches the signal Windows emits when the underlying console window is closed # by the user. We still want to save the emulator state in that case, which would not # happen by default! diff --git a/profiles/catch_block.yml b/profiles/catch_block.yml deleted file mode 100644 index eaabb746..00000000 --- a/profiles/catch_block.yml +++ /dev/null @@ -1,7 +0,0 @@ -# Catch block list config -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#catch_blockyml---catch-block-config - -block_list: - - PokemonName1 - - PokemonName2 - - PokemonName3 \ No newline at end of file diff --git a/profiles/cheats.yml b/profiles/cheats.yml deleted file mode 100644 index 4e659c98..00000000 --- a/profiles/cheats.yml +++ /dev/null @@ -1,14 +0,0 @@ -# Cheats config -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#cheatsyml---cheats-config - -starters: false # `true`, `false` -starters_rng: false # `true`, `false` - -# ----------------------------------------- -# Everything below is not implemented, yet™ -# ----------------------------------------- - -# TODO RNG manipulation -# TODO egg resets -# TODO instant find Feebas tile -# TODO instant find roamer map diff --git a/profiles/customhooks.py b/profiles/customhooks.py index c60709b0..722d04cc 100644 --- a/profiles/customhooks.py +++ b/profiles/customhooks.py @@ -3,13 +3,14 @@ import time import random from threading import Thread -from modules.config import config from modules.console import console from modules.context import context from modules.discord import discord_message from modules.pokemon import Pokemon from modules.runtime import get_sprites_path +config = context.active_config + def custom_hooks(hook) -> None: """ @@ -29,7 +30,7 @@ def custom_hooks(hook) -> None: # Discord messages def IVField() -> str: # Formatted IV table - if config["discord"]["iv_format"] == "formatted": + if config.discord.iv_format == "formatted": iv_field = ( "```" "╔═══╤═══╤═══╤═══╤═══╤═══╗\n" @@ -78,21 +79,21 @@ def PhaseSummary() -> dict: try: # Discord shiny Pokémon encountered - if config["discord"]["shiny_pokemon_encounter"]["enable"] and pokemon.is_shiny: + if config.discord.shiny_pokemon_encounter.enable and pokemon.is_shiny: # Discord pings discord_ping = "" - match config["discord"]["shiny_pokemon_encounter"]["ping_mode"]: + match config.discord.shiny_pokemon_encounter.ping_mode: case "role": - discord_ping = f"📢 <@&{config['discord']['shiny_pokemon_encounter']['ping_id']}>" + discord_ping = f"📢 <@&{config.discord.shiny_pokemon_encounter.ping_id}>" case "user": - discord_ping = f"📢 <@{config['discord']['shiny_pokemon_encounter']['ping_id']}>" + discord_ping = f"📢 <@{config.discord.shiny_pokemon_encounter.ping_id}>" block = ( "\n❌Skipping catching shiny (on catch block list)!" if pokemon.species.name in block_list else "" ) discord_message( - webhook_url=config["discord"]["shiny_pokemon_encounter"].get("webhook_url", None), + webhook_url=config.discord.shiny_pokemon_encounter.get("webhook_url", None), content=f"Encountered a shiny ✨ {pokemon.species.name} ✨! {block}\n{discord_ping}", embed=True, embed_title="Shiny encountered!", @@ -107,7 +108,7 @@ def PhaseSummary() -> dict: } | PhaseSummary(), embed_thumbnail=get_sprites_path() / "pokemon" / "shiny" / f"{pokemon.species.safe_name}.png", - embed_footer=f"PokéBot ID: {config['discord']['bot_id']} | {context.rom.game_name}", + embed_footer=f"PokéBot ID: {config.discord.bot_id} | {context.rom.game_name}", embed_color="ffd242", ) except: @@ -116,25 +117,25 @@ def PhaseSummary() -> dict: try: # Discord Pokémon encounter milestones if ( - config["discord"]["pokemon_encounter_milestones"]["enable"] + config.discord.pokemon_encounter_milestones.enable and stats["pokemon"][pokemon.species.name].get("encounters", -1) - % config["discord"]["pokemon_encounter_milestones"].get("interval", 0) + % config.discord.pokemon_encounter_milestones.get("interval", 0) == 0 ): # Discord pings discord_ping = "" - match config["discord"]["pokemon_encounter_milestones"]["ping_mode"]: + match config.discord.pokemon_encounter_milestones.ping_mode: case "role": - discord_ping = f"📢 <@&{config['discord']['pokemon_encounter_milestones']['ping_id']}>" + discord_ping = f"📢 <@&{config.discord.pokemon_encounter_milestones.ping_id}>" case "user": - discord_ping = f"📢 <@{config['discord']['pokemon_encounter_milestones']['ping_id']}>" + discord_ping = f"📢 <@{config.discord.pokemon_encounter_milestones.ping_id}>" discord_message( - webhook_url=config["discord"]["pokemon_encounter_milestones"].get("webhook_url", None), + webhook_url=config.discord.pokemon_encounter_milestones.get("webhook_url", None), content=f"🎉 New milestone achieved!\n{discord_ping}", embed=True, embed_description=f"{stats['pokemon'][pokemon.species.name].get('encounters', 0):,} {pokemon.species.name} encounters!", embed_thumbnail=get_sprites_path() / "pokemon" / "normal" / f"{pokemon.species.safe_name}.png", - embed_footer=f"PokéBot ID: {config['discord']['bot_id']} | {context.rom.game_name}", + embed_footer=f"PokéBot ID: {config.discord.bot_id} | {context.rom.game_name}", embed_color="50C878", ) except: @@ -143,26 +144,26 @@ def PhaseSummary() -> dict: try: # Discord shiny Pokémon encounter milestones if ( - config["discord"]["shiny_pokemon_encounter_milestones"]["enable"] + config.discord.shiny_pokemon_encounter_milestones.enable and pokemon.is_shiny and stats["pokemon"][pokemon.species.name].get("shiny_encounters", -1) - % config["discord"]["shiny_pokemon_encounter_milestones"].get("interval", 0) + % config.discord.shiny_pokemon_encounter_milestones.get("interval", 0) == 0 ): # Discord pings discord_ping = "" - match config["discord"]["shiny_pokemon_encounter_milestones"]["ping_mode"]: + match config.discord.shiny_pokemon_encounter_milestones.ping_mode: case "role": - discord_ping = f"📢 <@&{config['discord']['shiny_pokemon_encounter_milestones']['ping_id']}>" + discord_ping = f"📢 <@&{config.discord.shiny_pokemon_encounter_milestones.ping_id}>" case "user": - discord_ping = f"📢 <@{config['discord']['shiny_pokemon_encounter_milestones']['ping_id']}>" + discord_ping = f"📢 <@{config.discord.shiny_pokemon_encounter_milestones.ping_id}>" discord_message( - webhook_url=config["discord"]["shiny_pokemon_encounter_milestones"].get("webhook_url", None), + webhook_url=config.discord.shiny_pokemon_encounter_milestones.get("webhook_url", None), content=f"🎉 New milestone achieved!\n{discord_ping}", embed=True, embed_description=f"{stats['pokemon'][pokemon.species.name].get('shiny_encounters', 0):,} shiny ✨ {pokemon.species.name} ✨ encounters!", embed_thumbnail=get_sprites_path() / "pokemon" / "shiny" / f"{pokemon.species.safe_name}.png", - embed_footer=f"PokéBot ID: {config['discord']['bot_id']} | {context.rom.game_name}", + embed_footer=f"PokéBot ID: {config.discord.bot_id} | {context.rom.game_name}", embed_color="ffd242", ) except: @@ -171,18 +172,18 @@ def PhaseSummary() -> dict: try: # Discord total encounter milestones if ( - config["discord"]["total_encounter_milestones"]["enable"] + config.discord.total_encounter_milestones.enable and stats["totals"].get("encounters", -1) - % config["discord"]["total_encounter_milestones"].get("interval", 0) + % config.discord.total_encounter_milestones.get("interval", 0) == 0 ): # Discord pings discord_ping = "" - match config["discord"]["total_encounter_milestones"]["ping_mode"]: + match config.discord.total_encounter_milestones.ping_mode: case "role": - discord_ping = f"📢 <@&{config['discord']['total_encounter_milestones']['ping_id']}>" + discord_ping = f"📢 <@&{config.discord.total_encounter_milestones.ping_id}>" case "user": - discord_ping = f"📢 <@{config['discord']['total_encounter_milestones']['ping_id']}>" + discord_ping = f"📢 <@{config.discord.total_encounter_milestones.ping_id}>" embed_thumbnail = random.choice( [ @@ -204,12 +205,12 @@ def PhaseSummary() -> dict: ) discord_message( - webhook_url=config["discord"]["total_encounter_milestones"].get("webhook_url", None), + webhook_url=config.discord.total_encounter_milestones.get("webhook_url", None), content=f"🎉 New milestone achieved!\n{discord_ping}", embed=True, embed_description=f"{stats['totals'].get('encounters', 0):,} total encounters!", embed_thumbnail=get_sprites_path() / "items" / f"{embed_thumbnail}.png", - embed_footer=f"PokéBot ID: {config['discord']['bot_id']} | {context.rom.game_name}", + embed_footer=f"PokéBot ID: {config.discord.bot_id} | {context.rom.game_name}", embed_color="50C878", ) except: @@ -218,33 +219,33 @@ def PhaseSummary() -> dict: try: # Discord phase encounter notifications if ( - config["discord"]["phase_summary"]["enable"] + config.discord.phase_summary.enable and not pokemon.is_shiny and ( stats["totals"].get("phase_encounters", -1) - == config["discord"]["phase_summary"].get("first_interval", 0) + == config.discord.phase_summary.get("first_interval", 0) or ( stats["totals"].get("phase_encounters", -1) - > config["discord"]["phase_summary"].get("first_interval", 0) + > config.discord.phase_summary.get("first_interval", 0) and stats["totals"].get("phase_encounters", -1) - % config["discord"]["phase_summary"].get("consequent_interval", 0) + % config.discord.phase_summary.get("consequent_interval", 0) == 0 ) ) ): # Discord pings discord_ping = "" - match config["discord"]["phase_summary"]["ping_mode"]: + match config.discord.phase_summary.ping_mode: case "role": - discord_ping = f"📢 <@&{config['discord']['phase_summary']['ping_id']}>" + discord_ping = f"📢 <@&{config.discord.phase_summary.ping_id}>" case "user": - discord_ping = f"📢 <@{config['discord']['phase_summary']['ping_id']}>" + discord_ping = f"📢 <@{config.discord.phase_summary.ping_id}>" discord_message( - webhook_url=config["discord"]["phase_summary"].get("webhook_url", None), + webhook_url=config.discord.phase_summary.get("webhook_url", None), content=f"💀 The current phase has reached {stats['totals'].get('phase_encounters', 0):,} encounters!\n{discord_ping}", embed=True, embed_fields=PhaseSummary(), - embed_footer=f"PokéBot ID: {config['discord']['bot_id']} | {context.rom.game_name}", + embed_footer=f"PokéBot ID: {config.discord.bot_id} | {context.rom.game_name}", embed_color="D70040", ) except: @@ -252,16 +253,16 @@ def PhaseSummary() -> dict: try: # Discord anti-shiny Pokémon encountered - if config["discord"]["anti_shiny_pokemon_encounter"]["enable"] and pokemon.is_anti_shiny: + if config.discord.anti_shiny_pokemon_encounter.enable and pokemon.is_anti_shiny: # Discord pings discord_ping = "" - match config["discord"]["anti_shiny_pokemon_encounter"]["ping_mode"]: + match config.discord.anti_shiny_pokemon_encounter.ping_mode: case "role": - discord_ping = f"📢 <@&{config['discord']['anti_shiny_pokemon_encounter']['ping_id']}>" + discord_ping = f"📢 <@&{config.discord.anti_shiny_pokemon_encounter.ping_id}>" case "user": - discord_ping = f"📢 <@{config['discord']['anti_shiny_pokemon_encounter']['ping_id']}>" + discord_ping = f"📢 <@{config.discord.anti_shiny_pokemon_encounter.ping_id}>" discord_message( - webhook_url=config["discord"]["anti_shiny_pokemon_encounter"].get("webhook_url", None), + webhook_url=config.discord.anti_shiny_pokemon_encounter.get("webhook_url", None), content=f"Encountered an anti-shiny 💀 {pokemon.species.name} 💀!\n{discord_ping}", embed=True, embed_title="Anti-Shiny encountered!", @@ -274,7 +275,7 @@ def PhaseSummary() -> dict: } | PhaseSummary(), embed_thumbnail=get_sprites_path() / "pokemon" / "anti-shiny" / f"{pokemon.species.safe_name}.png", - embed_footer=f"PokéBot ID: {config['discord']['bot_id']} | {context.rom.game_name}", + embed_footer=f"PokéBot ID: {config.discord.bot_id} | {context.rom.game_name}", embed_color="000000", ) except: @@ -286,13 +287,13 @@ def PhaseSummary() -> dict: try: # Post the most recent OBS stream screenshot to Discord # (screenshot is taken in stats.py before phase resets) - if config["obs"]["discord_webhook_url"] and pokemon.is_shiny: + if config.obs.discord_webhook_url and pokemon.is_shiny: def OBSDiscordScreenshot(): time.sleep(3) # Give the screenshot some time to save to disk - images = glob.glob(f"{config['obs']['replay_dir']}*.png") + images = glob.glob(f"{config.obs.replay_dir}*.png") image = max(images, key=os.path.getctime) - discord_message(webhook_url=config["obs"].get("discord_webhook_url", None), image=image) + discord_message(webhook_url=config.obs.get("discord_webhook_url", None), image=image) # Run in a thread to not hold up other hooks Thread(target=OBSDiscordScreenshot).start() @@ -301,12 +302,12 @@ def OBSDiscordScreenshot(): try: # Save OBS replay buffer n frames after encountering a shiny - if config["obs"]["replay_buffer"] and pokemon.is_shiny: + if config.obs.replay_buffer and pokemon.is_shiny: def OBSReplayBuffer(): from modules.obs import obs_hot_key - time.sleep(config["obs"].get("replay_buffer_delay", 0)) + time.sleep(config.obs.get("replay_buffer_delay", 0)) obs_hot_key("OBS_KEY_F12", pressCtrl=True) # Run in a thread to not hold up other hooks diff --git a/profiles/discord.yml b/profiles/discord.yml deleted file mode 100644 index 18935d5b..00000000 --- a/profiles/discord.yml +++ /dev/null @@ -1,54 +0,0 @@ -# Discord settings -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#discordyml---discord-integration-config - -rich_presence: false # `true`, `false` -global_webhook_url: https://discord.com/api/webhooks/ -iv_format: formatted # `basic`, `formatted` -bot_id: PokéBot # Any string of text - -# Shiny Pokémon encounters -shiny_pokemon_encounter: - enable: false - ping_mode: - ping_id: - #webhook_url: - -# Pokémon encounter milestones -pokemon_encounter_milestones: - enable: false - interval: 10000 - ping_mode: - ping_id: - #webhook_url: - -# Shiny Pokémon encounter milestones -shiny_pokemon_encounter_milestones: - enable: false - interval: 5 - ping_mode: - ping_id: - #webhook_url: - -# Total encounter milestones -total_encounter_milestones: - enable: false - interval: 25000 - ping_mode: - ping_id: - #webhook_url: - -# Phase summary -phase_summary: - enable: false - first_interval: 8192 - consequent_interval: 5000 - ping_mode: - ping_id: - #webhook_url: - -# Anti-Shiny Pokémon encounters -anti_shiny_pokemon_encounter: - enable: false - ping_mode: - ping_id: - #webhook_url: diff --git a/profiles/general.yml b/profiles/general.yml deleted file mode 100644 index 443e16ff..00000000 --- a/profiles/general.yml +++ /dev/null @@ -1,6 +0,0 @@ -# General settings -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#generalyml---general-config - -# Starter - used when bot mode is set to `starters` -# `Treecko`, `Torchic`, `Mudkip`, `Bulbasaur`, `Charmander`, `Squirtle`, `Chikorita`, `Cyndaquil`, `Totodile` -starter: Mudkip diff --git a/profiles/keys.yml b/profiles/keys.yml deleted file mode 100644 index 2be4d657..00000000 --- a/profiles/keys.yml +++ /dev/null @@ -1,61 +0,0 @@ -# Emulator input mapping settings -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#keysyml---emulator-input-mapping - -# Default mGBA mapping -gba: - Up: 'Up' - Down: 'Down' - Left: 'Left' - Right: 'Right' - A: 'x' - B: 'z' - L: 'a' - R: 's' - Start: 'Return' - Select: 'BackSpace' - -# Keys that trigger emulator features -# -# You can optionally prefix a key code with 'Ctrl+' so that it only works if the Ctrl -# modifier key is held at the same time. -emulator: - # Increases the zoom level by 1 - zoom_in: 'plus' - - # Decreases the zoom level by 1 - zoom_out: 'minus' - - # Switches between the 'manual' bot mode and whatever is configured in `general.yml` - toggle_manual: 'Tab' - - # Disables or enables video output (disabling speeds up emulation) - toggle_video: 'v' - - # Turns sound on or off - toggle_audio: 'b' - - # Sets the emulation speed to 1×/2×/3×/4× - set_speed_1x: '1' - set_speed_2x: '2' - set_speed_3x: '3' - set_speed_4x: '4' - - # Disables throttling entirely, i.e. the emulator now runs as fast as your CPU allows - set_speed_unthrottled: '0' - - # Reset emulator core/reboot game - reset: 'Ctrl+R' - - # Exit emulator and bot - exit: 'Ctrl+Q' - - # Create a save state - save_state: 'Ctrl+S' - - # Open the load save state selection menu - load_state: 'Ctrl+L' - - # -- Ignore this unless you want to do programming on the bot code -- - # Allows you to put the emulator into 'stepping mode', where frames need to be advanced - # manually using a button. Useful for analysing memory values. - toggle_stepping_mode: 'Ctrl+P' diff --git a/profiles/logging.yml b/profiles/logging.yml deleted file mode 100644 index e009c2bb..00000000 --- a/profiles/logging.yml +++ /dev/null @@ -1,24 +0,0 @@ -# Logging/console output options -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#loggingyml---logging-and-console-output-config - -# Log encounters to .csv files -log_encounters: false # `true`, `false` - -# Console output -# `verbose`, `basic`, `disable` -console: - encounter_data: verbose - encounter_ivs: verbose - encounter_moves: disable - statistics: verbose - -# Save .pk3 files -# `true`, `false` -save_pk3: - all: false - shiny: true - custom: true - -# Automatically load .pk3 file into PC storage -# `true`, `false` -import_pk3: false diff --git a/profiles/obs.yml b/profiles/obs.yml deleted file mode 100644 index d040b51a..00000000 --- a/profiles/obs.yml +++ /dev/null @@ -1,21 +0,0 @@ -# OBS settings -# See readme for documentation: https://github.com/40Cakes/pokebot-gen3#obsyml---obs-config - -obs_websocket: - host: 127.0.0.1 - port: 4455 - password: password - -shiny_delay: 0 -discord_delay: 0 -screenshot: false # `true`, `false` -replay_buffer: false # `true`, `false` -replay_buffer_delay: 0 -discord_webhook_url: -replay_dir: "./stream/replays/" - -# Web server -http_server: - enable: false # `true`, `false` - ip: 127.0.0.1 - port: 8888 diff --git a/requirements.py b/requirements.py index 474922e2..e5eeb34c 100644 --- a/requirements.py +++ b/requirements.py @@ -12,6 +12,7 @@ # This is a list of requirements for `pip`, akin to `requirements.txt`. required_modules = [ + "confz==2.0.1", "numpy~=1.26.1", "Flask~=2.3.2", "Flask-Cors~=4.0.0",