diff --git a/README.md b/README.md index c528772..9cceaf2 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@
Logo -

PAN-OS Automation Project

+

PAN-OS and Panorama Upgrade Automation

Streamlining Palo Alto Networks Firewall Upgrades with Python Automation
diff --git a/docker/Dockerfile b/docker/Dockerfile index 8cab038..d037079 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -14,8 +14,8 @@ WORKDIR /app ADD settings.yaml /app # Install any needed packages specified in requirements.txt -# Note: The requirements.txt should contain pan-os-upgrade==1.2.5 -RUN pip install --no-cache-dir pan-os-upgrade==1.2.5 +# Note: The requirements.txt should contain pan-os-upgrade==1.2.6 +RUN pip install --no-cache-dir pan-os-upgrade==1.2.6 # Set the locale to avoid issues with emoji rendering ENV LANG C.UTF-8 diff --git a/docs/about/contributing.md b/docs/about/contributing.md index ec0b522..9f9454d 100644 --- a/docs/about/contributing.md +++ b/docs/about/contributing.md @@ -1,6 +1,6 @@ -# Contributing to PAN-OS Automation Project +# Contributing to `pan-os-upgrade` -We're thrilled that you're interested in contributing to the PAN-OS Automation Project! Your contributions are essential for making this project better and more effective. Whether you're fixing a bug, adding a new feature, improving the documentation, or just giving suggestions, every contribution is valuable. +We're thrilled that you're interested in contributing to the `pan-os-upgrade` project! Your contributions are essential for making this project better and more effective. Whether you're fixing a bug, adding a new feature, improving the documentation, or just giving suggestions, every contribution is valuable. --- @@ -10,7 +10,7 @@ Before you begin, make sure you have a GitHub account and are familiar with Git ### Setting Up Your Environment -1. **Fork the Repository:** Start by forking the [PAN-OS Automation Project repository](https://github.com/cdot65/pan-os-upgrade) on GitHub. +1. **Fork the Repository:** Start by forking the [pan-os-upgrade repository](https://github.com/cdot65/pan-os-upgrade) on GitHub. 2. **Clone Your Fork:** Clone your fork to your local machine: @@ -51,7 +51,7 @@ Before you begin, make sure you have a GitHub account and are familiar with Git git push origin feature/YourFeatureName ``` -2. **Create a Pull Request:** Go to the original PAN-OS Automation Project repository on GitHub and create a new pull request. Provide a clear description of your changes and any relevant issue numbers. +2. **Create a Pull Request:** Go to the original `pan-os-upgrade` repository on GitHub and create a new pull request. Provide a clear description of your changes and any relevant issue numbers. 3. **Code Review:** Wait for a review from the project maintainers. Be open to feedback and make any requested changes. diff --git a/docs/about/release-notes.md b/docs/about/release-notes.md index da6c0cd..24be609 100644 --- a/docs/about/release-notes.md +++ b/docs/about/release-notes.md @@ -2,6 +2,16 @@ Welcome to the release notes for the `pan-os-upgrade` tool. This document provides a detailed record of changes, enhancements, and fixes in each version of the tool. +## Version 1.2.6 + +**Release Date:** *<20240217>* + + +### What's New + +- Move HA status collection within the `get_firewall_details` function for batch upgrades. +- Update interactive inventory menu to include HA role of the firewall and if it has `preempt` enabled. + ## Version 1.2.5 **Release Date:** *<20240215>* diff --git a/pan_os_upgrade/upgrade.py b/pan_os_upgrade/upgrade.py index f6c6879..1155c3a 100644 --- a/pan_os_upgrade/upgrade.py +++ b/pan_os_upgrade/upgrade.py @@ -3473,40 +3473,35 @@ def get_emoji(action: str) -> str: return emoji_map.get(action, "") -def fetch_firewall_info(firewall: Firewall) -> Dict[str, Any]: +def get_firewall_details(firewall: Firewall) -> Dict[str, Any]: """ - Retrieves detailed system information from a specified firewall device and organizes it into a dictionary. + Retrieves detailed system and High Availability (HA) status information from a specified firewall device and organizes it into a dictionary. - This function communicates with the firewall to gather essential system details such as hostname, IP address, - model, serial number, software version, and application version. It's designed to support diagnostics, inventory - management, and operational monitoring by providing a snapshot of the firewall's current state and configuration. + This function establishes communication with the firewall to collect critical system details and HA status, such as hostname, IP address, model, serial number, software version, application version, and HA configuration. It is designed to assist in diagnostics, inventory management, operational monitoring, and checking the HA status by providing a comprehensive overview of the firewall's current operational state, configuration, and HA status. Parameters ---------- firewall : Firewall - The Firewall instance from which system information will be fetched. This object should be initialized with - the necessary authentication credentials and network details to facilitate API communication with the firewall. + The Firewall instance from which to fetch system information and HA status. This object must be initialized with the necessary authentication credentials and network details to enable API communication with the firewall. Returns ------- Dict[str, Any] - A dictionary containing key system information elements of the firewall, such as hostname, IP address, model, - serial number, software version, and application version. If the function encounters an error during information - retrieval, it returns a dictionary with the available data and marks the status as "Offline or Unavailable". + A dictionary containing key elements of the firewall's system information, such as hostname, IP address, model, serial number, software version, application version, and HA status. If an error occurs during information retrieval, the function returns a dictionary with the data available up to the point of failure and marks the status as "Offline or Unavailable". Example ------- - Fetching system information for a firewall: + Fetching system and HA status information for a firewall: >>> firewall_instance = Firewall(hostname='192.168.1.1', api_username='admin', api_password='admin') - >>> firewall_info = fetch_firewall_info(firewall_instance) + >>> firewall_info = get_firewall_details(firewall_instance) >>> print(firewall_info) - {'hostname': 'fw-hostname', 'ip-address': '192.168.1.1', 'model': 'PA-850', 'serial': '0123456789', 'sw-version': '10.0.0', 'app-version': '8200-1234'} + {'hostname': 'fw-hostname', 'ip-address': '192.168.1.1', 'model': 'PA-850', 'serial': '0123456789', + 'sw-version': '10.0.0', 'app-version': '8200-1234', 'ha-mode': 'active/passive', 'ha-details': {...}} Notes ----- - - This function is intended for use in environments where firewall configuration and status monitoring is necessary. - - Error handling is implemented to ensure that partial or default information is returned if the firewall is unreachable - or if any issues arise during the data retrieval process, allowing for graceful degradation of functionality. + - The function is aimed at scenarios requiring firewall configuration, status monitoring, and HA status checks. + - Error handling is in place to ensure that, in the event the firewall is unreachable or if any issues occur during data retrieval, partial or default information is returned. This allows for graceful degradation of functionality and ensures operational continuity. """ # Ensure a safe operation by working with a copy of the firewall object fw_copy = copy.deepcopy(firewall) @@ -3514,8 +3509,7 @@ def fetch_firewall_info(firewall: Firewall) -> Dict[str, Any]: try: # Attempt to retrieve system information from the firewall info = fw_copy.show_system_info() - # Organize and return the fetched information as a dictionary - return { + system_info = { "hostname": info["system"]["hostname"], "ip-address": info["system"]["ip-address"], "model": info["system"]["model"], @@ -3524,9 +3518,9 @@ def fetch_firewall_info(firewall: Firewall) -> Dict[str, Any]: "app-version": info["system"]["app-version"], } except Exception as e: - # Log and return default values in case of an error - logging.error(f"Error retrieving info for {fw_copy.serial}: {str(e)}") - return { + # Log and return default values in case of an error for system info + logging.error(f"Error retrieving system info for {fw_copy.serial}: {str(e)}") + system_info = { "hostname": fw_copy.hostname or "Unknown", "ip-address": "N/A", "model": "N/A", @@ -3536,6 +3530,27 @@ def fetch_firewall_info(firewall: Firewall) -> Dict[str, Any]: "status": "Offline or Unavailable", } + try: + # Retrieve HA status and details + deploy_info, ha_details = get_ha_status( + firewall, system_info.get("hostname", "") + ) + ha_info = { + "ha-mode": deploy_info, + "ha-details": ha_details, + } + except Exception as e: + # Log and return default values in case of an error for HA info + logging.error(f"Error retrieving HA info for {fw_copy.serial}: {str(e)}") + ha_info = { + "ha-mode": "N/A", + "ha-details": None, + } + + # Merge system info and HA info into a single dictionary + firewall_info = {**system_info, **ha_info} + return firewall_info + def get_firewalls_from_panorama(panorama: Panorama) -> list[Firewall]: """ @@ -3578,7 +3593,7 @@ def get_firewalls_from_panorama(panorama: Panorama) -> list[Firewall]: return firewalls -def get_firewalls_info(firewalls: List[Firewall]) -> List[Dict[str, Any]]: +def threaded_get_firewall_details(firewalls: List[Firewall]) -> List[Dict[str, Any]]: """ Retrieves detailed system information for a list of firewalls using concurrent executions to improve efficiency. @@ -3597,28 +3612,28 @@ def get_firewalls_info(firewalls: List[Firewall]) -> List[Dict[str, Any]]: ------- List[Dict[str, Any]] A list of dictionaries, with each dictionary containing system information for a respective firewall. The - structure and content of these dictionaries depend on the implementation of the `fetch_firewall_info` function + structure and content of these dictionaries depend on the implementation of the `get_firewall_details` function but typically include keys such as 'hostname', 'version', 'serial number', etc. Example ------- Fetching information for a list of firewall objects: >>> firewalls = [Firewall('192.168.1.1', api_key='apikey1'), Firewall('192.168.1.2', api_key='apikey2')] - >>> info = get_firewalls_info(firewalls) + >>> info = threaded_get_firewall_details(firewalls) # This returns a list of dictionaries, each containing information about a firewall. Notes ----- - This function leverages concurrent threads to fetch data, significantly reducing the total time required to obtain information from multiple devices. - - The actual data fetched and the structure of the returned dictionaries are determined by the `fetch_firewall_info` + - The actual data fetched and the structure of the returned dictionaries are determined by the `get_firewall_details` function, which this function depends on. """ firewalls_info = [] with ThreadPoolExecutor(max_workers=10) as executor: # Creating a future for each firewall info fetch task future_to_firewall_info = { - executor.submit(fetch_firewall_info, fw): fw for fw in firewalls + executor.submit(get_firewall_details, fw): fw for fw in firewalls } # Iterating over completed fetch tasks and collecting their results @@ -3665,7 +3680,7 @@ def get_managed_devices( """ managed_devices = model_from_api_response( - panorama.op("show devices all"), ManagedDevices + panorama.op("show devices connected"), ManagedDevices ) devices = managed_devices.devices @@ -3898,19 +3913,14 @@ def resolve_hostname(hostname: str) -> bool: def select_devices_from_table(firewall_mapping: dict) -> List[str]: """ - Displays a table of firewalls and prompts the user to select devices for further operations. This selection - process allows the user to specify one or more devices by their listing numbers, a range, or a combination - thereof. The function then returns a list of hostnames corresponding to the user's selections. + Presents a table of firewalls, including details such as hostname, IP address, model, serial number, software version, and HA mode, and prompts the user to select devices for further operations. Users can select devices by their listing numbers, a range, or a combination thereof. The function returns a list of hostnames corresponding to the user's selections. - This interactive step is crucial for operations that target multiple devices, enabling precise control over - which devices are included. The function ensures that selections are valid and within the range of displayed - devices, providing feedback for any invalid entries. + This interactive step is crucial for operations targeting multiple devices, as it provides users with precise control over which devices are included. The function ensures that selections are valid and within the range of displayed devices, providing feedback on any invalid entries. Parameters ---------- firewall_mapping : dict - A mapping from device hostnames to their respective details (e.g., IP address, model, serial number), - used to generate the selection table. + A dictionary mapping device hostnames to their respective details, which includes the firewall object, IP address, model, serial number, software version, application version, HA mode, and HA details. This information is used to generate the selection table. Returns ------- @@ -3919,39 +3929,70 @@ def select_devices_from_table(firewall_mapping: dict) -> List[str]: Examples -------- - Presenting a selection table and capturing user choices: + Displaying a selection table and capturing user choices: >>> firewall_mapping = { - ... 'fw1': {'ip-address': '10.1.1.1', 'model': 'PA-850', 'serial': '0123456789', 'sw-version': '9.1.0', 'app-version': '9.1.0'}, - ... 'fw2': {'ip-address': '10.1.1.2', 'model': 'PA-220', 'serial': '9876543210', 'sw-version': '9.1.2', 'app-version': '9.1.3'} + ... 'fw1': { + ... 'object': , + ... 'hostname': 'fw1', + ... 'ip-address': '10.1.1.1', + ... 'model': 'PA-850', + ... 'serial': '0123456789', + ... 'sw-version': '9.1.0', + ... 'app-version': '9.1.0', + ... 'ha-mode': 'active/passive', + ... 'ha-details': None, + ... }, + ... 'fw2': { + ... 'object': , + ... 'hostname': 'fw2', + ... 'ip-address': '10.1.1.2', + ... 'model': 'PA-220', + ... 'serial': '9876543210', + ... 'sw-version': '9.1.2', + ... 'app-version': '9.1.3', + ... 'ha-mode': 'active/active', + ... 'ha-details': {...}, + ... }, ... } >>> selected_hostnames = select_devices_from_table(firewall_mapping) - # User is prompted to select from the table of devices. The function returns the hostnames of selected devices. + # The user is prompted to select from the table. The function returns the hostnames of the selected devices. Notes ----- - - The function leverages the `tabulate` library to present a well-structured table, enhancing readability and - ease of selection. - - It accommodates various input formats for selecting devices, including individual numbers, ranges (e.g., 2-4), - or a comma-separated list, providing flexibility in selection methodology. - - Invalid selections (e.g., out-of-range numbers or incorrect formats) are handled gracefully, with prompts for - correction, ensuring a robust and user-friendly selection process. + - Utilizes the `tabulate` library to display a structured and readable table for device selection. + - Supports various input formats for device selection, such as individual numbers, ranges (e.g., '2-4'), or a comma-separated list, offering flexibility in selection methodology. + - Gracefully handles invalid selections with prompts for correction, ensuring a user-friendly selection process. """ # Sort firewalls by hostname for consistent display sorted_firewall_items = sorted(firewall_mapping.items(), key=lambda item: item[0]) - devices_table = [ - [ - Fore.CYAN + str(i + 1) + Fore.RESET, - details["hostname"], - details["ip-address"], - details["model"], - details["serial"], - details["sw-version"], - details["app-version"], - ] - for i, (hostname, details) in enumerate(sorted_firewall_items) - ] + devices_table = [] + for i, (hostname, details) in enumerate(sorted_firewall_items): + preemptive_status = "N/A" + if details["ha-details"] is not None: + preemptive_status = ( + details["ha-details"] + .get("result", {}) + .get("group", {}) + .get("local-info", {}) + .get("preemptive", "N/A") + ) + + # Using 'hostname' to add an entry to the 'devices_table' + devices_table.append( + [ + Fore.CYAN + str(i + 1) + Fore.RESET, + hostname, + details["ip-address"], + details["model"], + # details["serial"], + details["sw-version"], + details["app-version"], + details["ha-mode"], + preemptive_status, + ] + ) typer.echo( tabulate( @@ -3961,9 +4002,11 @@ def select_devices_from_table(firewall_mapping: dict) -> List[str]: Fore.GREEN + "Hostname" + Fore.RESET, Fore.GREEN + "IP Address" + Fore.RESET, Fore.GREEN + "Model" + Fore.RESET, - Fore.GREEN + "Serial" + Fore.RESET, - Fore.GREEN + "SW Version" + Fore.RESET, - Fore.GREEN + "App Version" + Fore.RESET, + # Fore.GREEN + "Serial" + Fore.RESET, + Fore.GREEN + "PAN-OS" + Fore.RESET, + Fore.GREEN + "Content" + Fore.RESET, + Fore.GREEN + "HA Mode" + Fore.RESET, + Fore.GREEN + "Preempt" + Fore.RESET, ], tablefmt="fancy_grid", ) @@ -4025,16 +4068,56 @@ def select_devices_from_table(firewall_mapping: dict) -> List[str]: hostname, details = sorted_firewall_items[index] if hostname not in user_selected_hostnames: user_selected_hostnames.append(hostname) - typer.echo(Fore.GREEN + f"{hostname} selected." + Fore.RESET) + typer.echo(Fore.GREEN + f" - {hostname} selected." + Fore.RESET) else: typer.echo( - Fore.YELLOW + f"{hostname} is already selected." + Fore.RESET + Fore.YELLOW + + f" - {hostname} is already selected." + + Fore.RESET ) else: typer.echo( Fore.RED + f"Selection '{index + 1}' is out of range." + Fore.RESET ) + # New code to check for preemptive="yes" and prompt user + preemptive_firewalls = [] + for hostname in user_selected_hostnames: + details = firewall_mapping.get(hostname, {}) + ha_details = details.get("ha-details", {}) + if ha_details: + preemptive_status = ( + ha_details.get("result", {}) + .get("group", {}) + .get("local-info", {}) + .get("preemptive", "no") + ) + if preemptive_status.lower() == "yes": + preemptive_firewalls.append(hostname) + + if preemptive_firewalls: + typer.echo( + Fore.RED + + f"Warning: Firewalls {', '.join(preemptive_firewalls)} have 'preempt' enabled, this can cause an interruption." + + Fore.RESET + ) + confirmation = typer.prompt( + Fore.YELLOW + + "Are you sure that you want to add these firewalls to the upgrade list? (y/n)" + + Fore.RESET + ) + if confirmation.lower() != "y": + user_selected_hostnames = [ + hostname + for hostname in user_selected_hostnames + if hostname not in preemptive_firewalls + ] + typer.echo( + Fore.GREEN + + "Firewalls with 'preempt' set to 'yes' have been excluded." + + Fore.RESET + ) + return user_selected_hostnames @@ -4484,7 +4567,7 @@ def batch( logging.info( f"{get_emoji('working')} {hostname}: Retrieving detailed information of each firewall..." ) - firewalls_info = get_firewalls_info(all_firewalls) + firewalls_info = threaded_get_firewall_details(all_firewalls) # Create a mapping of firewalls for selection firewall_mapping = create_firewall_mapping(all_firewalls, firewalls_info) @@ -4544,7 +4627,8 @@ def batch( f"{get_emoji('warning')} {hostname}: Dry run mode is disabled, upgrade workflow will be executed." ) confirmation = typer.confirm( - "Do you want to proceed with the upgrade?", abort=True + f"{get_emoji('report')} {hostname}: Do you want to proceed with the upgrade?", + abort=True, ) typer.echo(f"{get_emoji('start')} Proceeding with the upgrade...") @@ -4719,7 +4803,7 @@ def inventory( logging.info( f"{get_emoji('working')} {hostname}: Retrieving detailed information of each firewall..." ) - firewalls_info = get_firewalls_info(all_firewalls) + firewalls_info = threaded_get_firewall_details(all_firewalls) # Create a mapping of firewalls for selection firewall_mapping = create_firewall_mapping(all_firewalls, firewalls_info) diff --git a/pyproject.toml b/pyproject.toml index 83ce2a0..5ab8ac2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pan-os-upgrade" -version = "1.2.5" +version = "1.2.6" description = "Python script to automate the upgrade process of PAN-OS firewalls." authors = ["Calvin Remsburg "] documentation = "https://cdot65.github.io/pan-os-upgrade/" diff --git a/tests/test_create_firewall_mapping.py b/tests/test_create_firewall_mapping.py index c2bb6c2..95d57a1 100644 --- a/tests/test_create_firewall_mapping.py +++ b/tests/test_create_firewall_mapping.py @@ -4,7 +4,7 @@ from pan_os_upgrade.upgrade import ( connect_to_host, get_firewalls_from_panorama, - get_firewalls_info, + threaded_get_firewall_details, create_firewall_mapping, ) from panos.firewall import Firewall @@ -79,7 +79,7 @@ def test_create_firewall_mapping_integration(): assert all_firewalls, "No firewalls retrieved from Panorama" # Fetch detailed information for each firewall - firewalls_info = get_firewalls_info(all_firewalls) + firewalls_info = threaded_get_firewall_details(all_firewalls) # Ensure that information is retrieved for each firewall assert firewalls_info, "Failed to retrieve firewall information" diff --git a/tests/test_get_ha_status.py b/tests/test_get_ha_status.py index c4d26cb..0d0a363 100644 --- a/tests/test_get_ha_status.py +++ b/tests/test_get_ha_status.py @@ -11,9 +11,9 @@ ("panorama.cdot.io", "disabled", None), ("panorama1.cdot.io", "primary-active", None), ("panorama2.cdot.io", "secondary-passive", None), - ("houston.cdot.io", "disabled", None), - ("woodlands-fw1.cdot.io", "active", None), - ("woodlands-fw2.cdot.io", "passive", None), + ("lab-fw1.cdot.io", "disabled", None), + ("lab-fw6.cdot.io", "active", None), + ("lab-fw7.cdot.io", "passive", None), ]