Skip to content

Commit

Permalink
feat: firewall/batch CLI options to set firewalls to be upgraded
Browse files Browse the repository at this point in the history
`--peer` CLI option introduced in `firewall` command in order to allow
specifying peer hostname/IP if auto-detected management IP is not accessible.

`--inventory` CLI option introduced in `batch` command to allow specifying
the list of firewalls to upgrade in CLI.
  • Loading branch information
alperenkose committed Apr 17, 2024
1 parent b19ddf6 commit ee7aefa
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
4 changes: 3 additions & 1 deletion pan_os_upgrade/components/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def get_emoji(action: str) -> str:
return emoji_map.get(action, "")


def ip_callback(value: str) -> str:
def ip_callback(value: str) -> Union[str, None]:
"""
Validates the input as either a resolvable hostname or a valid IP address, intended for CLI input validation.
Expand Down Expand Up @@ -825,6 +825,8 @@ def ip_callback(value: str) -> str:
- The function's utility extends beyond mere validation, contributing to the tool's overall resilience and user-friendliness by preventing erroneous network operations.
- Default settings can be overridden by configurations specified in a `settings.yaml` file if `SETTINGS_FILE_PATH` is used within the script, allowing for customized validation logic based on the application's needs.
"""
if value is None:
return value

# First, try to resolve as a hostname
if resolve_hostname(hostname=value):
Expand Down
80 changes: 58 additions & 22 deletions pan_os_upgrade/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def firewall(
typer.Option(
"--hostname",
"-h",
help="Hostname or IP address of either Panorama or firewall appliance",
help="Hostname or IP address of a firewall appliance",
prompt="Firewall hostname or IP",
callback=ip_callback,
),
Expand Down Expand Up @@ -183,6 +183,14 @@ def firewall(
prompt="Target version",
),
],
peer_hostname: Annotated[
Optional[str],
typer.Option(
"--peer",
help="Hostname or IP address of the peer firewall appliance in an HA pair, management IP for the peer is auto-detected if not set",
callback=ip_callback
),
] = None,
dry_run: Annotated[
Optional[bool],
typer.Option(
Expand Down Expand Up @@ -214,6 +222,8 @@ def firewall(
The corresponding password for the provided administrative username.
target_version : str
The version of PAN-OS to which the firewall is to be upgraded. Must be a valid and supported version for the device.
peer_hostname: str, optional
The IP address or DNS hostname of the peer firewall in an HA pair. Usable when auto-detected firewalll management IP is not accessible directly but have a NAT IP.
dry_run : bool, optional
When set, the function performs all preparatory and validation steps without executing the actual upgrade. Dry run is the default selection in interactive mode.
non_interactive: bool, optional
Expand Down Expand Up @@ -275,26 +285,27 @@ def firewall(
# Store all peer-info details in a dictionary
peer = ha_dict["result"]["group"]["peer-info"]

# Determine the peer's IP address if the mgmt-ip is not empty
if peer["mgmt-ip"] and len(peer["mgmt-ip"]) > 0:
peer["ip"] = peer["mgmt-ip"].split("/")[0]
if not peer_hostname: # if peer hostname is not specifically set, try to get it
# Determine the peer's IP address if the mgmt-ip is not empty
if peer["mgmt-ip"] and len(peer["mgmt-ip"]) > 0:
peer_hostname = peer["mgmt-ip"].split("/")[0]

# If the mgmt-ip is empty, use the mgmt-ipv6 field
elif peer["mgmt-ipv6"] and len(peer["mgmt-ipv6"]) > 0:
peer["ip"] = peer["mgmt-ipv6"].split("/")[0]
# If the mgmt-ip is empty, use the mgmt-ipv6 field
elif peer["mgmt-ipv6"] and len(peer["mgmt-ipv6"]) > 0:
peer_hostname = peer["mgmt-ipv6"].split("/")[0]

# If the mgmt-ip and mgmt-ipv6 fields are both empty, use the ha1-ipaddr field
elif peer["ha1-ipaddr"] and len(peer["ha1-ipaddr"]) > 0:
peer["ip"] = peer["ha1-ipaddr"]
# If the mgmt-ip and mgmt-ipv6 fields are both empty, use the ha1-ipaddr field
elif peer["ha1-ipaddr"] and len(peer["ha1-ipaddr"]) > 0:
peer_hostname = peer["ha1-ipaddr"]

else:
# no mgmt-ip or mgmt-ipv6 or ha1-ipaddr found, log message and sys.exit
logging.error(
f"{get_emoji(action='error')} {hostname}: No IP address found for the peer firewall. Exiting."
)
sys.exit(1)
else:
# no mgmt-ip or mgmt-ipv6 or ha1-ipaddr found, log message and sys.exit
logging.error(
f"{get_emoji(action='error')} {hostname}: No IP address found for the peer firewall. Exiting."
)
sys.exit(1)

firewall_objects_for_upgrade.append(Firewall(peer["ip"], username, password))
firewall_objects_for_upgrade.append(Firewall(peer_hostname, username, password))

# First round of upgrades, targeting all firewalls and placing active firewalls in an HA pair on a revisit list
with ThreadPoolExecutor(max_workers=2) as executor:
Expand Down Expand Up @@ -628,6 +639,14 @@ def batch(
help="Perform non-interactive upgrade with default options. Disables --dry-run option.",
),
] = False,
inventory: Annotated[
Optional[str],
typer.Option(
"--inventory",
"-i",
help="Preset firewall inventory to be upgraded as a comma delimited list of firewall hostnames. Takes precedence over inventory file. Provide both peers for an HA pair.",
),
] = None,
):
"""
Orchestrates a batch upgrade process for firewalls under Panorama's management. This command leverages Panorama
Expand All @@ -654,6 +673,8 @@ def batch(
If set, the command simulates the upgrade process without making any changes to the devices. Dry run is the default selection in interactive mode.
non_interactive: bool, optional
When set, the function performs all the upgrade steps without any prompts. Dry run is disabled in non interactive mode.
inventory: str, optional
Comma delimited list of firewall hostnames to be upgraded, it takes precedence over inventory file and avoids interactive inventory selection. Usable with non-interactive mode to provide firewalls as a CLI option. Both peers of an HA pair need to provided on HA upgrades.
Examples
--------
Expand Down Expand Up @@ -745,8 +766,12 @@ def batch(
firewalls_info=firewalls_info,
)

# --inventory cli option takes precedence over inventory file
if inventory:
user_selected_hostnames = inventory.split(",")

# Check if inventory.yaml exists and if it does, read the selected devices
if INVENTORY_FILE_PATH.exists():
elif INVENTORY_FILE_PATH.exists():
with open(INVENTORY_FILE_PATH, "r") as file:
inventory_data = yaml.safe_load(file)
user_selected_hostnames = inventory_data.get("firewalls_to_upgrade", [])
Expand All @@ -757,13 +782,24 @@ def batch(
user_selected_hostnames = select_devices_from_table(
firewall_mapping=firewall_mapping
) if not non_interactive else []

firewalls_to_upgrade = {
hostname: firewall_mapping[hostname]['ip-address']
for hostname in user_selected_hostnames
if hostname in firewall_mapping
}

absent_firewalls = set(user_selected_hostnames) - set(firewalls_to_upgrade)
if absent_firewalls:
logging.error(
f"{get_emoji(action='error')} Firewalls {list(absent_firewalls)} in inventory are absent in Panorama. Exiting."
)
sys.exit(1)

# Extracting the Firewall objects from the filtered mapping
firewall_objects_for_upgrade = [
firewall_mapping[hostname]["object"]
for hostname in user_selected_hostnames
if hostname in firewall_mapping
for hostname in firewalls_to_upgrade
]
logging.info(
f"{get_emoji(action='working')} {hostname}: Selected {len(firewall_objects_for_upgrade)} firewalls from inventory.yaml for upgrade."
Expand All @@ -781,8 +817,8 @@ def batch(

firewall_list = "\n".join(
[
f" - {firewall_mapping[hostname]['hostname']} ({firewall_mapping[hostname]['ip-address']})"
for hostname in user_selected_hostnames
f" - {hostname} ({ipaddr})"
for hostname,ipaddr in firewalls_to_upgrade.items()
]
)

Expand Down

0 comments on commit ee7aefa

Please sign in to comment.