diff --git a/pan_os_upgrade/components/device.py b/pan_os_upgrade/components/device.py index 0751103..3086322 100644 --- a/pan_os_upgrade/components/device.py +++ b/pan_os_upgrade/components/device.py @@ -35,42 +35,34 @@ # Common setup for all subcommands def common_setup( - hostname: str, - username: str, - password: str, settings_file: LazySettings, settings_file_path: Path, -) -> PanDevice: +) -> None: """ - Initializes the environment for interacting with a Palo Alto Networks device, including directory setup, logging configuration, and establishing a device connection. + Initializes the environment for interacting with a Palo Alto Networks device, including directory setup and logging configuration. - This function consolidates essential preparatory steps required before performing operations on a Palo Alto Networks device. It ensures the creation of necessary directories for organized data storage and logs, sets up logging with a configurable verbosity level, and establishes a secure connection to the device using the provided API credentials. The function is designed to return a `PanDevice` object, which could be a `Firewall` or `Panorama` instance, ready for subsequent API interactions. + This function consolidates essential preparatory steps required before performing operations on a Palo Alto Networks device. It ensures the creation of necessary directories for organized data storage and logs, and sets up logging with a configurable verbosity level. Parameters ---------- - hostname : str - The network address or DNS name of the Palo Alto Networks device to connect to. - username : str - The API username for authenticating with the device. - password : str - The API password for authenticating with the device. - - Returns - ------- - PanDevice - A connected `PanDevice` instance, representing the target Palo Alto Networks device, fully initialized and ready for further API operations. + settings_file : LazySettings + The LazySettings object containing configurations loaded from the settings file. + settings_file_path : Path + The filesystem path to the settings.yaml file, which contains custom configuration settings. Example ------- Initializing the environment for a device: - >>> device = common_setup('10.0.0.1', 'apiuser', 'apipassword') - # Ensures necessary directories exist, logging is configured, and returns a connected `PanDevice` instance. + >>> common_setup( + settings_file=Dynaconf(settings_files=[str(SETTINGS_FILE_PATH)]), + settings_file_path=SETTINGS_FILE_PATH, + ) + # Ensures necessary directories exist, and logging is configured. Notes ----- - Directory setup is performed only once; existing directories are not modified. - Logging configuration affects the entire application's logging behavior; the log level can be overridden by `settings.yaml` if `SETTINGS_FILE_PATH` is detected in the function. - - A successful device connection is critical for the function to return; otherwise, it may raise exceptions based on connection issues. The ability to override default settings with `settings.yaml` is supported for the log level configuration in this function if `SETTINGS_FILE_PATH` is utilized within `configure_logging`. """ @@ -93,14 +85,6 @@ def common_setup( settings_file_path=settings_file_path, ) - # Connect to the device - device = connect_to_host( - hostname=hostname, - username=username, - password=password, - ) - return device - def connect_to_host( hostname: str, @@ -115,16 +99,16 @@ def connect_to_host( Parameters ---------- hostname : str - The hostname or IP address of the target Palo Alto Networks device. - api_username : str - The API username for authentication. - api_password : str - The password corresponding to the API username. + The network address or DNS name of the Palo Alto Networks device to connect to. + username : str + The API username for authenticating with the device. + password : str + The API password for authenticating with the device. Returns ------- PanDevice - A PanDevice object representing the connected device, which may be a Firewall or Panorama instance. + A PanDevice object representing the connected device, which may be a Firewall or Panorama instance, ready for further API operations. Raises ------ @@ -146,6 +130,7 @@ def connect_to_host( - Initiating a connection to a device is a prerequisite for performing any operational or configuration tasks via the API. - The function's error handling provides clear diagnostics, aiding in troubleshooting connection issues. - Configuration settings for the connection, such as timeout periods and retry attempts, can be customized through the `settings.yaml` file, if `settings_file_path` is utilized within the function. + - A successful device connection is critical for the function to return; otherwise, it may raise exceptions based on connection issues. """ try: diff --git a/pan_os_upgrade/components/utilities.py b/pan_os_upgrade/components/utilities.py index 73ec774..bf0406b 100644 --- a/pan_os_upgrade/components/utilities.py +++ b/pan_os_upgrade/components/utilities.py @@ -790,7 +790,7 @@ def get_emoji(action: str) -> str: return emoji_map.get(action, "") -def ip_callback(value: str) -> str: +def ip_callback(value: str) -> Union[str, None]: """ Validates the input as either a resolvable hostname or a valid IP address, intended for CLI input validation. @@ -825,6 +825,8 @@ def ip_callback(value: str) -> str: - The function's utility extends beyond mere validation, contributing to the tool's overall resilience and user-friendliness by preventing erroneous network operations. - Default settings can be overridden by configurations specified in a `settings.yaml` file if `SETTINGS_FILE_PATH` is used within the script, allowing for customized validation logic based on the application's needs. """ + if value is None: + return value # First, try to resolve as a hostname if resolve_hostname(hostname=value): diff --git a/pan_os_upgrade/main.py b/pan_os_upgrade/main.py index a5e58d9..a6ebe87 100644 --- a/pan_os_upgrade/main.py +++ b/pan_os_upgrade/main.py @@ -79,6 +79,7 @@ from pathlib import Path from threading import Lock from typing_extensions import Annotated +from typing import Optional # Palo Alto Networks imports from panos.firewall import Firewall @@ -93,6 +94,7 @@ from pan_os_upgrade.components.assurance import AssuranceOptions from pan_os_upgrade.components.device import ( common_setup, + connect_to_host, get_firewalls_from_panorama, threaded_get_firewall_details, ) @@ -148,7 +150,7 @@ def firewall( typer.Option( "--hostname", "-h", - help="Hostname or IP address of either Panorama or firewall appliance", + help="Hostname or IP address of a firewall appliance", prompt="Firewall hostname or IP", callback=ip_callback, ), @@ -181,15 +183,29 @@ def firewall( prompt="Target version", ), ], + peer_hostname: Annotated[ + Optional[str], + typer.Option( + "--peer", + help="Hostname or IP address of the peer firewall appliance in an HA pair, management IP for the peer is auto-detected if not set", + callback=ip_callback + ), + ] = None, dry_run: Annotated[ - bool, + Optional[bool], typer.Option( "--dry-run", "-d", help="Perform a dry run of all tests and downloads without performing the actual upgrade", - prompt="Dry Run?", ), - ] = True, + ] = None, + non_interactive: Annotated[ + bool, + typer.Option( + "--non-interactive", + help="Perform non-interactive upgrade with default options. Disables --dry-run option.", + ), + ] = False, ): """ Launches the upgrade process for a Palo Alto Networks firewall, facilitating a comprehensive and controlled upgrade workflow. @@ -206,8 +222,12 @@ def firewall( The corresponding password for the provided administrative username. target_version : str The version of PAN-OS to which the firewall is to be upgraded. Must be a valid and supported version for the device. + peer_hostname: str, optional + The IP address or DNS hostname of the peer firewall in an HA pair. Usable when auto-detected firewalll management IP is not accessible directly but have a NAT IP. dry_run : bool, optional - When set to True, the function performs all preparatory and validation steps without executing the actual upgrade, defaulting to False. + When set, the function performs all preparatory and validation steps without executing the actual upgrade. Dry run is the default selection in interactive mode. + non_interactive: bool, optional + When set, the function performs all the upgrade steps without any prompts. Dry run is disabled in non interactive mode. Examples -------- @@ -235,12 +255,23 @@ def firewall( typer.echo(banner) # Perform common setup tasks, return a connected device - device = common_setup( + common_setup( + settings_file=SETTINGS_FILE, + settings_file_path=SETTINGS_FILE_PATH, + ) + + if non_interactive: + logging.info( + f"{get_emoji(action='skipped')} Non-interactive mode is set, ignoring --dry-run option." + ) + dry_run = False # override dry run to false in non-interactive mode + elif dry_run is None: # if dry-run option is not set explicitly + dry_run = typer.confirm("Dry Run?", default=True) + + device = connect_to_host( hostname=hostname, username=username, password=password, - settings_file=SETTINGS_FILE, - settings_file_path=SETTINGS_FILE_PATH, ) firewall_objects_for_upgrade = [device] @@ -254,26 +285,27 @@ def firewall( # Store all peer-info details in a dictionary peer = ha_dict["result"]["group"]["peer-info"] - # Determine the peer's IP address if the mgmt-ip is not empty - if peer["mgmt-ip"] and len(peer["mgmt-ip"]) > 0: - peer["ip"] = peer["mgmt-ip"].split("/")[0] + if not peer_hostname: # if peer hostname is not specifically set, try to get it + # Determine the peer's IP address if the mgmt-ip is not empty + if peer["mgmt-ip"] and len(peer["mgmt-ip"]) > 0: + peer_hostname = peer["mgmt-ip"].split("/")[0] - # If the mgmt-ip is empty, use the mgmt-ipv6 field - elif peer["mgmt-ipv6"] and len(peer["mgmt-ipv6"]) > 0: - peer["ip"] = peer["mgmt-ipv6"].split("/")[0] + # If the mgmt-ip is empty, use the mgmt-ipv6 field + elif peer["mgmt-ipv6"] and len(peer["mgmt-ipv6"]) > 0: + peer_hostname = peer["mgmt-ipv6"].split("/")[0] - # If the mgmt-ip and mgmt-ipv6 fields are both empty, use the ha1-ipaddr field - elif peer["ha1-ipaddr"] and len(peer["ha1-ipaddr"]) > 0: - peer["ip"] = peer["ha1-ipaddr"] + # If the mgmt-ip and mgmt-ipv6 fields are both empty, use the ha1-ipaddr field + elif peer["ha1-ipaddr"] and len(peer["ha1-ipaddr"]) > 0: + peer_hostname = peer["ha1-ipaddr"] - else: - # no mgmt-ip or mgmt-ipv6 or ha1-ipaddr found, log message and sys.exit - logging.error( - f"{get_emoji(action='error')} {hostname}: No IP address found for the peer firewall. Exiting." - ) - sys.exit(1) + else: + # no mgmt-ip or mgmt-ipv6 or ha1-ipaddr found, log message and sys.exit + logging.error( + f"{get_emoji(action='error')} {hostname}: No IP address found for the peer firewall. Exiting." + ) + sys.exit(1) - firewall_objects_for_upgrade.append(Firewall(peer["ip"], username, password)) + firewall_objects_for_upgrade.append(Firewall(peer_hostname, username, password)) # First round of upgrades, targeting all firewalls and placing active firewalls in an HA pair on a revisit list with ThreadPoolExecutor(max_workers=2) as executor: @@ -441,12 +473,15 @@ def panorama( typer.echo(banner) # Perform common setup tasks, return a connected device - device = common_setup( + common_setup( + settings_file=SETTINGS_FILE, + settings_file_path=SETTINGS_FILE_PATH, + ) + + device = connect_to_host( hostname=hostname, username=username, password=password, - settings_file=SETTINGS_FILE, - settings_file_path=SETTINGS_FILE_PATH, ) panorama_objects_for_upgrade = [device] @@ -590,15 +625,28 @@ def batch( ), ], dry_run: Annotated[ - bool, + Optional[bool], typer.Option( "--dry-run", "-d", help="Perform a dry run of all tests and downloads without performing the actual upgrade", - prompt="Dry Run?", - is_flag=True, ), - ] = True, + ] = None, + non_interactive: Annotated[ + bool, + typer.Option( + "--non-interactive", + help="Perform non-interactive upgrade with default options. Disables --dry-run option.", + ), + ] = False, + inventory: Annotated[ + Optional[str], + typer.Option( + "--inventory", + "-i", + help="Preset firewall inventory to be upgraded as a comma delimited list of firewall hostnames. Takes precedence over inventory file. Provide both peers for an HA pair.", + ), + ] = None, ): """ Orchestrates a batch upgrade process for firewalls under Panorama's management. This command leverages Panorama @@ -622,7 +670,11 @@ def batch( target_version : str The version of PAN-OS to which the firewalls should be upgraded. dry_run : bool, optional - If set, the command simulates the upgrade process without making any changes to the devices. Defaults to True, meaning dry run is enabled by default. + If set, the command simulates the upgrade process without making any changes to the devices. Dry run is the default selection in interactive mode. + non_interactive: bool, optional + When set, the function performs all the upgrade steps without any prompts. Dry run is disabled in non interactive mode. + inventory: str, optional + Comma delimited list of firewall hostnames to be upgraded, it takes precedence over inventory file and avoids interactive inventory selection. Usable with non-interactive mode to provide firewalls as a CLI option. Both peers of an HA pair need to provided on HA upgrades. Examples -------- @@ -665,12 +717,23 @@ def batch( typer.echo(banner) # Perform common setup tasks, return a connected device - device = common_setup( + common_setup( + settings_file=SETTINGS_FILE, + settings_file_path=SETTINGS_FILE_PATH, + ) + + if non_interactive: + logging.info( + f"{get_emoji(action='skipped')} Non-interactive mode is set, ignoring --dry-run option." + ) + dry_run = False # override dry run to false in non-interactive mode + elif dry_run is None: # if dry-run option is not set explicitly + dry_run = typer.confirm("Dry Run?", default=True) + + device = connect_to_host( hostname=hostname, username=username, password=password, - settings_file=SETTINGS_FILE, - settings_file_path=SETTINGS_FILE_PATH, ) # Exit script if device is Firewall (batch upgrade is only supported when connecting to Panorama) @@ -703,24 +766,40 @@ def batch( firewalls_info=firewalls_info, ) + # --inventory cli option takes precedence over inventory file + if inventory: + user_selected_hostnames = inventory.split(",") + # Check if inventory.yaml exists and if it does, read the selected devices - if INVENTORY_FILE_PATH.exists(): + elif INVENTORY_FILE_PATH.exists(): with open(INVENTORY_FILE_PATH, "r") as file: inventory_data = yaml.safe_load(file) user_selected_hostnames = inventory_data.get("firewalls_to_upgrade", []) # If inventory.yaml does not exist, then prompt the user to select devices else: - # Present a table of firewalls with detailed system information for selection + # Present a table of firewalls with detailed system information for selection - skipped in non interactive mode user_selected_hostnames = select_devices_from_table( firewall_mapping=firewall_mapping + ) if not non_interactive else [] + + firewalls_to_upgrade = { + hostname: firewall_mapping[hostname]['ip-address'] + for hostname in user_selected_hostnames + if hostname in firewall_mapping + } + + absent_firewalls = set(user_selected_hostnames) - set(firewalls_to_upgrade) + if absent_firewalls: + logging.error( + f"{get_emoji(action='error')} Firewalls {list(absent_firewalls)} in inventory are absent in Panorama. Exiting." ) + sys.exit(1) # Extracting the Firewall objects from the filtered mapping firewall_objects_for_upgrade = [ firewall_mapping[hostname]["object"] - for hostname in user_selected_hostnames - if hostname in firewall_mapping + for hostname in firewalls_to_upgrade ] logging.info( f"{get_emoji(action='working')} {hostname}: Selected {len(firewall_objects_for_upgrade)} firewalls from inventory.yaml for upgrade." @@ -738,8 +817,8 @@ def batch( firewall_list = "\n".join( [ - f" - {firewall_mapping[hostname]['hostname']} ({firewall_mapping[hostname]['ip-address']})" - for hostname in user_selected_hostnames + f" - {hostname} ({ipaddr})" + for hostname,ipaddr in firewalls_to_upgrade.items() ] ) @@ -747,8 +826,13 @@ def batch( f"{get_emoji(action='report')} {hostname}: Please confirm the selected firewalls:\n{firewall_list}" ) - # Asking for user confirmation before proceeding - if dry_run: + # Proceed with upgrade if in non-interactive mode otherwise ask for user confirmation before proceeding + if non_interactive: + typer.echo( + f"{get_emoji(action='warning')} {hostname}: Non interactive mode is enabled, upgrade workflow will be executed without confirmation." + ) + confirmation = True + elif dry_run: typer.echo( f"{get_emoji(action='warning')} {hostname}: Dry run mode is enabled, upgrade workflow will be skipped." ) @@ -927,12 +1011,15 @@ def inventory( banner = console_welcome_banner(mode="inventory") typer.echo(banner) - device = common_setup( + common_setup( + settings_file=SETTINGS_FILE, + settings_file_path=SETTINGS_FILE_PATH, + ) + + device = connect_to_host( hostname=hostname, username=username, password=password, - settings_file=SETTINGS_FILE, - settings_file_path=SETTINGS_FILE_PATH, ) if type(device) is Firewall: