diff --git a/pan_os_upgrade/components/utilities.py b/pan_os_upgrade/components/utilities.py index 73ec774..bf0406b 100644 --- a/pan_os_upgrade/components/utilities.py +++ b/pan_os_upgrade/components/utilities.py @@ -790,7 +790,7 @@ def get_emoji(action: str) -> str: return emoji_map.get(action, "") -def ip_callback(value: str) -> str: +def ip_callback(value: str) -> Union[str, None]: """ Validates the input as either a resolvable hostname or a valid IP address, intended for CLI input validation. @@ -825,6 +825,8 @@ def ip_callback(value: str) -> str: - The function's utility extends beyond mere validation, contributing to the tool's overall resilience and user-friendliness by preventing erroneous network operations. - Default settings can be overridden by configurations specified in a `settings.yaml` file if `SETTINGS_FILE_PATH` is used within the script, allowing for customized validation logic based on the application's needs. """ + if value is None: + return value # First, try to resolve as a hostname if resolve_hostname(hostname=value): diff --git a/pan_os_upgrade/main.py b/pan_os_upgrade/main.py index 90813f8..a6ebe87 100644 --- a/pan_os_upgrade/main.py +++ b/pan_os_upgrade/main.py @@ -150,7 +150,7 @@ def firewall( typer.Option( "--hostname", "-h", - help="Hostname or IP address of either Panorama or firewall appliance", + help="Hostname or IP address of a firewall appliance", prompt="Firewall hostname or IP", callback=ip_callback, ), @@ -183,6 +183,14 @@ def firewall( prompt="Target version", ), ], + peer_hostname: Annotated[ + Optional[str], + typer.Option( + "--peer", + help="Hostname or IP address of the peer firewall appliance in an HA pair, management IP for the peer is auto-detected if not set", + callback=ip_callback + ), + ] = None, dry_run: Annotated[ Optional[bool], typer.Option( @@ -214,6 +222,8 @@ def firewall( The corresponding password for the provided administrative username. target_version : str The version of PAN-OS to which the firewall is to be upgraded. Must be a valid and supported version for the device. + peer_hostname: str, optional + The IP address or DNS hostname of the peer firewall in an HA pair. Usable when auto-detected firewalll management IP is not accessible directly but have a NAT IP. dry_run : bool, optional When set, the function performs all preparatory and validation steps without executing the actual upgrade. Dry run is the default selection in interactive mode. non_interactive: bool, optional @@ -275,26 +285,27 @@ def firewall( # Store all peer-info details in a dictionary peer = ha_dict["result"]["group"]["peer-info"] - # Determine the peer's IP address if the mgmt-ip is not empty - if peer["mgmt-ip"] and len(peer["mgmt-ip"]) > 0: - peer["ip"] = peer["mgmt-ip"].split("/")[0] + if not peer_hostname: # if peer hostname is not specifically set, try to get it + # Determine the peer's IP address if the mgmt-ip is not empty + if peer["mgmt-ip"] and len(peer["mgmt-ip"]) > 0: + peer_hostname = peer["mgmt-ip"].split("/")[0] - # If the mgmt-ip is empty, use the mgmt-ipv6 field - elif peer["mgmt-ipv6"] and len(peer["mgmt-ipv6"]) > 0: - peer["ip"] = peer["mgmt-ipv6"].split("/")[0] + # If the mgmt-ip is empty, use the mgmt-ipv6 field + elif peer["mgmt-ipv6"] and len(peer["mgmt-ipv6"]) > 0: + peer_hostname = peer["mgmt-ipv6"].split("/")[0] - # If the mgmt-ip and mgmt-ipv6 fields are both empty, use the ha1-ipaddr field - elif peer["ha1-ipaddr"] and len(peer["ha1-ipaddr"]) > 0: - peer["ip"] = peer["ha1-ipaddr"] + # If the mgmt-ip and mgmt-ipv6 fields are both empty, use the ha1-ipaddr field + elif peer["ha1-ipaddr"] and len(peer["ha1-ipaddr"]) > 0: + peer_hostname = peer["ha1-ipaddr"] - else: - # no mgmt-ip or mgmt-ipv6 or ha1-ipaddr found, log message and sys.exit - logging.error( - f"{get_emoji(action='error')} {hostname}: No IP address found for the peer firewall. Exiting." - ) - sys.exit(1) + else: + # no mgmt-ip or mgmt-ipv6 or ha1-ipaddr found, log message and sys.exit + logging.error( + f"{get_emoji(action='error')} {hostname}: No IP address found for the peer firewall. Exiting." + ) + sys.exit(1) - firewall_objects_for_upgrade.append(Firewall(peer["ip"], username, password)) + firewall_objects_for_upgrade.append(Firewall(peer_hostname, username, password)) # First round of upgrades, targeting all firewalls and placing active firewalls in an HA pair on a revisit list with ThreadPoolExecutor(max_workers=2) as executor: @@ -628,6 +639,14 @@ def batch( help="Perform non-interactive upgrade with default options. Disables --dry-run option.", ), ] = False, + inventory: Annotated[ + Optional[str], + typer.Option( + "--inventory", + "-i", + help="Preset firewall inventory to be upgraded as a comma delimited list of firewall hostnames. Takes precedence over inventory file. Provide both peers for an HA pair.", + ), + ] = None, ): """ Orchestrates a batch upgrade process for firewalls under Panorama's management. This command leverages Panorama @@ -654,6 +673,8 @@ def batch( If set, the command simulates the upgrade process without making any changes to the devices. Dry run is the default selection in interactive mode. non_interactive: bool, optional When set, the function performs all the upgrade steps without any prompts. Dry run is disabled in non interactive mode. + inventory: str, optional + Comma delimited list of firewall hostnames to be upgraded, it takes precedence over inventory file and avoids interactive inventory selection. Usable with non-interactive mode to provide firewalls as a CLI option. Both peers of an HA pair need to provided on HA upgrades. Examples -------- @@ -745,8 +766,12 @@ def batch( firewalls_info=firewalls_info, ) + # --inventory cli option takes precedence over inventory file + if inventory: + user_selected_hostnames = inventory.split(",") + # Check if inventory.yaml exists and if it does, read the selected devices - if INVENTORY_FILE_PATH.exists(): + elif INVENTORY_FILE_PATH.exists(): with open(INVENTORY_FILE_PATH, "r") as file: inventory_data = yaml.safe_load(file) user_selected_hostnames = inventory_data.get("firewalls_to_upgrade", []) @@ -757,13 +782,24 @@ def batch( user_selected_hostnames = select_devices_from_table( firewall_mapping=firewall_mapping ) if not non_interactive else [] + + firewalls_to_upgrade = { + hostname: firewall_mapping[hostname]['ip-address'] + for hostname in user_selected_hostnames + if hostname in firewall_mapping + } + + absent_firewalls = set(user_selected_hostnames) - set(firewalls_to_upgrade) + if absent_firewalls: + logging.error( + f"{get_emoji(action='error')} Firewalls {list(absent_firewalls)} in inventory are absent in Panorama. Exiting." ) + sys.exit(1) # Extracting the Firewall objects from the filtered mapping firewall_objects_for_upgrade = [ firewall_mapping[hostname]["object"] - for hostname in user_selected_hostnames - if hostname in firewall_mapping + for hostname in firewalls_to_upgrade ] logging.info( f"{get_emoji(action='working')} {hostname}: Selected {len(firewall_objects_for_upgrade)} firewalls from inventory.yaml for upgrade." @@ -781,8 +817,8 @@ def batch( firewall_list = "\n".join( [ - f" - {firewall_mapping[hostname]['hostname']} ({firewall_mapping[hostname]['ip-address']})" - for hostname in user_selected_hostnames + f" - {hostname} ({ipaddr})" + for hostname,ipaddr in firewalls_to_upgrade.items() ] )