From ae0f5f32ae6efb77be6795eba1be00eccd90a482 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Tue, 16 Jan 2024 10:47:10 -0600 Subject: [PATCH 1/7] Add support for dry run flag in upgrade script --- upgrade.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/upgrade.py b/upgrade.py index 9056777..a812812 100644 --- a/upgrade.py +++ b/upgrade.py @@ -184,6 +184,9 @@ class Args(BaseModel): api_key : str, optional API key for authentication with the Firewall appliance. Default is None. + dry_run : bool, optional + Flag to indicate whether the script should perform a dry run. + Default is False. hostname : str, optional Hostname or IP address of the Firewall appliance. Default is None. @@ -203,6 +206,7 @@ class Args(BaseModel): """ api_key: str = None + dry_run: bool = False hostname: str = None log_level: str = "info" password: str = None @@ -282,6 +286,9 @@ def parse_arguments() -> Args: If the hostname or target version is not provided either as CLI arguments or in the .env file, or if neither the API key nor both username and password are provided. """ + # Load environment variables first + load_environment_variables(".env") + parser = argparse.ArgumentParser( description="Script to interact with Firewall appliance." ) @@ -292,6 +299,13 @@ def parse_arguments() -> Args: default=None, help="API Key for authentication", ) + parser.add_argument( + "--dry-run", + dest="dry_run", + action="store_true", + default=os.getenv("DRY_RUN", "False").lower() == "true", + help="Dry run of the upgrade process", + ) parser.add_argument( "--hostname", dest="hostname", @@ -303,7 +317,7 @@ def parse_arguments() -> Args: "--log-level", dest="log_level", choices=LOGGING_LEVELS.keys(), - default="info", + default=os.getenv("LOG_LEVEL", "info"), help="Set the logging output level", ) parser.add_argument( @@ -337,6 +351,7 @@ def parse_arguments() -> Args: # Create a new structure to store arguments with different variable names arguments = { "api_key": args.api_key or os.getenv("API_KEY"), + "dry_run": args.dry_run or os.getenv("DRY_RUN"), "hostname": args.hostname or os.getenv("HOSTNAME"), "pan_username": args.username or os.getenv("PAN_USERNAME"), "pan_password": args.password or os.getenv("PAN_PASSWORD"), @@ -1204,9 +1219,13 @@ def main() -> None: ) logging.debug(backup_config) logging.info("Configuration backed up successfully") - # import ipdb - # ipdb.set_trace() + # Exit execution is dry_run is True + if args["dry_run"] is True: + logging.info("Dry run complete, exiting...") + sys.exit(0) + else: + logging.info("Not a dry run, continue with upgrade...") if __name__ == "__main__": From f9774169a4e3574f411b8d68d9816cb542904ad2 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Tue, 16 Jan 2024 10:54:15 -0600 Subject: [PATCH 2/7] Update default log level to "info" --- upgrade.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/upgrade.py b/upgrade.py index a812812..4fce0d9 100644 --- a/upgrade.py +++ b/upgrade.py @@ -356,7 +356,7 @@ def parse_arguments() -> Args: "pan_username": args.username or os.getenv("PAN_USERNAME"), "pan_password": args.password or os.getenv("PAN_PASSWORD"), "target_version": args.target_version or os.getenv("TARGET_VERSION"), - "log_level": args.log_level or os.getenv("LOG_LEVEL"), + "log_level": args.log_level or os.getenv("LOG_LEVEL") or "info", } # Check for missing hostname From 5e335304cf5d570a3fc3448aabaee822c1ee5ef7 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Tue, 16 Jan 2024 17:14:54 -0600 Subject: [PATCH 3/7] Update user authentication logic to use bcrypt hashing algorithm --- upgrade.py | 215 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 146 insertions(+), 69 deletions(-) diff --git a/upgrade.py b/upgrade.py index 4fce0d9..e01bec1 100644 --- a/upgrade.py +++ b/upgrade.py @@ -362,15 +362,16 @@ def parse_arguments() -> Args: # Check for missing hostname if not arguments["hostname"]: logging.error( - "Error: Hostname must be provided as a --hostname argument or in .env", + f"{get_emoji('error')} Hostname must be provided as a --hostname argument or in .env", ) sys.exit(1) # Check for missing target version if not arguments["target_version"]: logging.error( - "Error: Target version must be provided as a --version argument or in .env", + f"{get_emoji('error')} Target version must be provided as a --version argument or in .env", ) + logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) # Ensuring mutual exclusivity @@ -378,9 +379,10 @@ def parse_arguments() -> Args: arguments["pan_username"] = arguments["pan_password"] = None elif not (arguments["pan_username"] and arguments["pan_password"]): logging.error( - "Error: Provide either API key --api-key argument or both --username and --password", + f"{get_emoji('error')} Provide either API key --api-key argument or both --username and --password", file=sys.stderr, ) + logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) return arguments @@ -426,7 +428,7 @@ def configure_logging(level: str) -> None: # Create formatters and add them to the handlers console_format = logging.Formatter( - "%(name)s - %(levelname)s - %(message)s", + "%(levelname)s - %(message)s", ) file_format = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", @@ -439,6 +441,21 @@ def configure_logging(level: str) -> None: logger.addHandler(file_handler) +def get_emoji(action): + emoji_map = { + "success": "✅", + "warning": "⚠️", + "error": "❌", + "working": "⚙️", + "report": "📝", + "search": "🔍", + "save": "💾", + "stop": "🛑", + "start": "🚀", + } + return emoji_map.get(action, "") + + # ---------------------------------------------------------------------------- # Helper function to flip XML objects into Python dictionaries # ---------------------------------------------------------------------------- @@ -500,14 +517,19 @@ def check_readiness_and_log( log_message = f'{test_info["description"]} - {test_result["reason"]}' if test_result["state"]: - logging.info(f"Passed Readiness Check: {test_info['description']}") + logging.info( + f"{get_emoji('success')} Passed Readiness Check: {test_info['description']}" + ) else: if test_info["log_level"] == "error": - logging.error(f"{log_message}") + logging.error(f"{get_emoji('error')} {log_message}") if test_info["exit_on_failure"]: + logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) elif test_info["log_level"] == "warning": - logging.info(f"Skipped Readiness Check: {test_info['description']}") + logging.info( + f"{get_emoji('report')} Skipped Readiness Check: {test_info['description']}" + ) else: logging.debug(log_message) @@ -558,8 +580,9 @@ def connect_to_firewall(args: dict) -> Firewall: if isinstance(target_device, panos.panorama.Panorama): logging.error( - "You are targeting a Panorama appliance, please target a firewall." + f"{get_emoji('error')} You are targeting a Panorama appliance, please target a firewall." ) + logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) return target_device @@ -627,20 +650,23 @@ def parse_version(version: str) -> Tuple[int, int, int, int]: f"{target_major}.{target_minor}.{target_maintenance}" ) - logging.info(f"Current PAN-OS version: {firewall.version}") + logging.info(f"{get_emoji('report')} Current PAN-OS version: {firewall.version}") logging.info( - f"Target PAN-OS version: {target_major}.{target_minor}.{target_maintenance}" + f"{get_emoji('report')} Target PAN-OS version: {target_major}.{target_minor}.{target_maintenance}" ) upgrade_needed = current_version < target_version if upgrade_needed: logging.info( - f"Confirmed that moving from {firewall.version} to {target_major}.{target_minor}.{target_maintenance} is an upgrade" + f"{get_emoji('success')} Confirmed that moving from {firewall.version} to {target_major}.{target_minor}.{target_maintenance} is an upgrade" ) return else: - logging.error("Upgrade is not required or a downgrade was attempted.") + logging.error( + f"{get_emoji('error')} Upgrade is not required or a downgrade was attempted." + ) + logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) @@ -696,20 +722,24 @@ def software_update_check( # check to see if target version is available for upgrade if target_version in available_versions: logging.info( - f"Target PAN-OS version {target_version} is available for download" + f"{get_emoji('success')} Target PAN-OS version {target_version} is available for download" ) # validate the target version's base image is already downloaded if available_versions[f"{target_major}.{target_minor}.0"]["downloaded"]: - logging.info(f"Base image for {target_version} is already downloaded") + logging.info( + f"{get_emoji('success')} Base image for {target_version} is already downloaded" + ) return True else: - logging.error(f"Base image for {target_version} is not downloaded") + logging.error( + f"{get_emoji('error')} Base image for {target_version} is not downloaded" + ) return False else: logging.error( - f"Target PAN-OS version {target_version} is not available for download" + f"{get_emoji('error')} Target PAN-OS version {target_version} is not available for download" ) return False @@ -743,13 +773,17 @@ def get_ha_status(firewall: Firewall) -> Tuple: >>> get_ha_status(fw) ('active/passive', {'ha_details': ...}) """ - logging.debug(f"Getting {firewall.serial} deployment information...") + logging.debug( + f"{get_emoji('start')} Getting {firewall.serial} deployment information..." + ) deployment_type = firewall.show_highavailability_state() - logging.debug(f"Firewall deployment: {deployment_type[0]}") + logging.debug(f"{get_emoji('report')} Firewall deployment: {deployment_type[0]}") if deployment_type[1]: ha_details = xml_to_dict(deployment_type[1]) - logging.debug(f"Firewall deployment details: {ha_details}") + logging.debug( + f"{get_emoji('report')} Firewall deployment details: {ha_details}" + ) return deployment_type[0], ha_details else: return deployment_type[0], None @@ -797,22 +831,28 @@ def software_download( """ if firewall.software.versions[target_version]["downloaded"]: - logging.info(f"PAN-OS version {target_version} already on firewall.") + logging.info( + f"{get_emoji('success')} PAN-OS version {target_version} already on firewall." + ) return True if ( not firewall.software.versions[target_version]["downloaded"] or firewall.software.versions[target_version]["downloaded"] != "downloading" ): - logging.info(f"PAN-OS version {target_version} is not on the firewall") + logging.info( + f"{get_emoji('search')} PAN-OS version {target_version} is not on the firewall" + ) start_time = time.time() try: - logging.info(f"PAN-OS version {target_version} is beginning download") + logging.info( + f"{get_emoji('start')} PAN-OS version {target_version} is beginning download" + ) firewall.software.download(target_version) except PanDeviceXapiError as download_error: - logging.error(download_error) + logging.error(f"{get_emoji('error')} {download_error}") sys.exit(1) while True: @@ -822,7 +862,7 @@ def software_download( if dl_status is True: logging.info( - f"{target_version} downloaded in {elapsed_time} seconds", + f"{get_emoji('success')} {target_version} downloaded in {elapsed_time} seconds", ) return True elif dl_status in (False, "downloading"): @@ -834,18 +874,20 @@ def software_download( ) if ha_details: logging.info( - f"{status_msg} - HA will sync image - Elapsed time: {elapsed_time} seconds" + f"{get_emoji('start')} {status_msg} - HA will sync image - Elapsed time: {elapsed_time} seconds" ) else: logging.info(f"{status_msg} - Elapsed time: {elapsed_time} seconds") else: - logging.error(f"Download failed after {elapsed_time} seconds") + logging.error( + f"{get_emoji('error')} Download failed after {elapsed_time} seconds" + ) return False time.sleep(30) else: - logging.error(f"Error downloading {target_version}.") + logging.error(f"{get_emoji('error')} Error downloading {target_version}.") sys.exit(1) @@ -909,11 +951,15 @@ def run_assurance( if operation_type == "readiness_check": for action in actions: if action not in AssuranceOptions.READINESS_CHECKS.keys(): - logging.error(f"Invalid action for readiness check: {action}") + logging.error( + f"{get_emoji('error')} Invalid action for readiness check: {action}" + ) sys.exit(1) try: - logging.info("Checking if firewall is ready for upgrade...") + logging.info( + f"{get_emoji('start')} Checking if firewall is ready for upgrade..." + ) result = checks_firewall.run_readiness_checks(actions) for ( @@ -925,14 +971,16 @@ def run_assurance( return ReadinessCheckReport(**result) except Exception as e: - logging.error(f"Error running readiness checks: {e}") + logging.error(f"{get_emoji('error')} Error running readiness checks: {e}") return None elif operation_type == "state_snapshot": # validate each type of action for action in actions: if action not in AssuranceOptions.STATE_SNAPSHOTS: - logging.error(f"Invalid action for state snapshot: {action}") + logging.error( + f"{get_emoji('error')} Invalid action for state snapshot: {action}" + ) return # take snapshots @@ -948,19 +996,21 @@ def run_assurance( return None except Exception as e: - logging.error("Error running snapshots: %s", e) + logging.error(f"{get_emoji('error')} Error running snapshots: %s", e) return elif operation_type == "report": for action in actions: if action not in AssuranceOptions.REPORTS: - logging.error(f"Invalid action for report: {action}") + logging.error( + f"{get_emoji('error')} Invalid action for report: {action}" + ) return - logging.info(f"Generating report: {action}") + logging.info(f"{get_emoji('report')} Generating report: {action}") # result = getattr(Report(firewall), action)(**config) else: - logging.error(f"Invalid operation type: {operation_type}") + logging.error(f"{get_emoji('error')} Invalid operation type: {operation_type}") return return results @@ -970,7 +1020,9 @@ def run_assurance( # Perform the snapshot of the network state # ---------------------------------------------------------------------------- def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: - logging.info("Taking a snapshot of network state information...") + logging.info( + f"{get_emoji('start')} Taking a snapshot of network state information..." + ) # take snapshots network_snapshot = run_assurance( @@ -991,7 +1043,7 @@ def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: # Check if a readiness check was successfully created if isinstance(network_snapshot, SnapshotReport): - logging.info("Network snapshot created successfully") + logging.info(f"{get_emoji('report')} Network snapshot created successfully") network_snapshot_json = network_snapshot.model_dump_json(indent=4) logging.debug(network_snapshot_json) @@ -1001,17 +1053,19 @@ def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: file.write(network_snapshot_json) logging.info( - f"Network state snapshot collected from {hostname}, saved to {file_path}" + f"{get_emoji('save')} Network state snapshot collected from {hostname}, saved to {file_path}" ) else: - logging.error("Failed to create snapshot") + logging.error(f"{get_emoji('error')} Failed to create snapshot") # ---------------------------------------------------------------------------- # Perform the readiness checks # ---------------------------------------------------------------------------- def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) -> None: - logging.info("Performing readiness checks of target firewall...") + logging.info( + f"{get_emoji('start')} Performing readiness checks of target firewall..." + ) readiness_check = run_assurance( firewall, @@ -1034,7 +1088,7 @@ def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) # Check if a readiness check was successfully created if isinstance(readiness_check, ReadinessCheckReport): # Do something with the readiness check report, e.g., log it, save it, etc. - logging.info("Readiness Checks completed") + logging.info(f"{get_emoji('success')} Readiness Checks completed") readiness_check_report_json = readiness_check.model_dump_json(indent=4) logging.debug(readiness_check_report_json) @@ -1043,9 +1097,11 @@ def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) with open(file_path, "w") as file: file.write(readiness_check_report_json) - logging.info(f"Readiness checks completed for {hostname}, saved to {file_path}") + logging.info( + f"{get_emoji('save')} Readiness checks completed for {hostname}, saved to {file_path}" + ) else: - logging.error("Failed to create readiness check") + logging.error(f"{get_emoji('error')} Failed to create readiness check") # ---------------------------------------------------------------------------- @@ -1075,7 +1131,9 @@ def backup_configuration( # Run operational command to retrieve configuration config_xml = firewall.op("show config running") if config_xml is None: - logging.error("Failed to retrieve running configuration.") + logging.error( + f"{get_emoji('error')} Failed to retrieve running configuration." + ) return False # Check XML structure @@ -1084,7 +1142,9 @@ def backup_configuration( or len(config_xml) == 0 or config_xml[0].tag != "result" ): - logging.error("Unexpected XML structure in configuration data.") + logging.error( + f"{get_emoji('error')} Unexpected XML structure in configuration data." + ) return False # Extract the configuration data from the tag @@ -1100,11 +1160,13 @@ def backup_configuration( with open(file_path, "w") as file: file.write(config_str) - logging.info(f"Configuration backed up successfully to {file_path}") + logging.info( + f"{get_emoji('save')} Configuration backed up successfully to {file_path}" + ) return True except Exception as e: - logging.error(f"Error backing up configuration: {e}") + logging.error(f"{get_emoji('error')} Error backing up configuration: {e}") return False @@ -1142,52 +1204,60 @@ def main() -> None: # Create our connection to the firewall logging.debug("Connecting to PAN-OS firewall...") firewall = connect_to_firewall(args) - logging.info("Connection established") + logging.info(f"{get_emoji('start')} Connection established") # Refresh system information to ensure we have the latest data logging.debug("Refreshing system information...") firewall_details = SystemSettings.refreshall(firewall)[0] logging.info( - f"{firewall.serial} {firewall_details.hostname} {firewall_details.ip_address}" + f"{get_emoji('report')} {firewall.serial} {firewall_details.hostname} {firewall_details.ip_address}" ) # Determine if the firewall is standalone, HA, or in a cluster - logging.debug("Checking if firewall is standalone, HA, or in a cluster...") + logging.debug( + f"{get_emoji('start')} Checking if firewall is standalone, HA, or in a cluster..." + ) deploy_info, ha_details = get_ha_status(firewall) - logging.info(f"Firewall HA mode: {deploy_info}") - logging.debug(f"Firewall HA details: {ha_details}") + logging.info(f"{get_emoji('report')} Firewall HA mode: {deploy_info}") + logging.debug(f"{get_emoji('report')} Firewall HA details: {ha_details}") # Check to see if the firewall is ready for an upgrade - logging.debug("Checking firewall readiness...") + logging.debug(f"{get_emoji('start')} Checking firewall readiness...") update_available = software_update_check( firewall, args["target_version"], ha_details ) - logging.debug("Firewall readiness check complete") + logging.debug(f"{get_emoji('report')} Firewall readiness check complete") # gracefully exit if the firewall is not ready for an upgrade to target version if not update_available: logging.error( - f"Firewall is not ready for upgrade to {args['target_version']}.", + f"{get_emoji('error')} Firewall is not ready for upgrade to {args['target_version']}.", ) sys.exit(1) # Download the target PAN-OS version - logging.info(f"Checking if {args['target_version']} is downloaded...") + logging.info( + f"{get_emoji('start')} Checking if {args['target_version']} is downloaded..." + ) image_downloaded = software_download(firewall, args["target_version"], ha_details) if deploy_info == "active" or deploy_info == "passive": logging.info( - f"{args['target_version']} has been downloaded and sync'd to HA peer." + f"{get_emoji('success')} {args['target_version']} has been downloaded and sync'd to HA peer." ) else: - logging.info(f"PAN-OS version {args['target_version']} has been downloaded.") + logging.info( + f"{get_emoji('success')} PAN-OS version {args['target_version']} has been downloaded." + ) # Begin snapshots of the network state if not image_downloaded: - logging.error("Image not downloaded, exiting...") + logging.error(f"{get_emoji('error')} Image not downloaded, exiting...") sys.exit(1) # Execute the pre-upgrade snapshot - logging.info("Taking a pre-upgrade snapshot of network state information...") + logging.info( + f"{get_emoji('start')} Taking a pre-upgrade snapshot of network state information..." + ) perform_snapshot( firewall, firewall_details.hostname, @@ -1195,7 +1265,9 @@ def main() -> None: ) # Execute Readiness Checks - logging.info("Checking device to see if its ready for an upgrade...") + logging.info( + f"{get_emoji('start')} Checking device to see if its ready for an upgrade..." + ) perform_readiness_checks( firewall, firewall_details.hostname, @@ -1204,28 +1276,33 @@ def main() -> None: # If the firewall is in an HA pair, check the HA peer to ensure sync has been enabled if ha_details: - logging.info("Checking HA peer to ensure the two are in sync...") + logging.info( + f"{get_emoji('start')} Checking HA peer to ensure the two are in sync..." + ) if ha_details["response"]["result"]["group"]["running-sync"] == "synchronized": - logging.info("HA peer sync has been completed") + logging.info(f"{get_emoji('success')} HA peer sync has been completed") else: - logging.error("HA peer state is not in sync") + logging.error(f"{get_emoji('error')} HA peer state is not in sync") + logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) # Back up configuration to local filesystem - logging.info("Backing up configuration to local filesystem...") + logging.info( + f"{get_emoji('start')} Backing up configuration to local filesystem..." + ) backup_config = backup_configuration( firewall, f'assurance/configurations/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.xml', ) - logging.debug(backup_config) - logging.info("Configuration backed up successfully") + logging.debug(f"{get_emoji('report')} {backup_config}") # Exit execution is dry_run is True if args["dry_run"] is True: - logging.info("Dry run complete, exiting...") + logging.info(f"{get_emoji('success')} Dry run complete, exiting...") + logging.info(f"{get_emoji('stop')} Halting script.") sys.exit(0) else: - logging.info("Not a dry run, continue with upgrade...") + logging.info(f"{get_emoji('start')} Not a dry run, continue with upgrade...") if __name__ == "__main__": From 6b5f256973ee548b1715678d57ec2e8c6b52a7b6 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Wed, 17 Jan 2024 05:37:41 -0600 Subject: [PATCH 4/7] Improve argparse to include help and other functions --- upgrade.py | 126 +++++++++++++++++++++++++++++------------------------ 1 file changed, 69 insertions(+), 57 deletions(-) diff --git a/upgrade.py b/upgrade.py index e01bec1..4daf4c0 100644 --- a/upgrade.py +++ b/upgrade.py @@ -290,28 +290,48 @@ def parse_arguments() -> Args: load_environment_variables(".env") parser = argparse.ArgumentParser( - description="Script to interact with Firewall appliance." + description="This script interacts with a Firewall appliance to perform readiness checks, " + "snapshots, and configuration backups in before and after its upgrade. If arguments are not " + "provided, the script will attempt to load them from a .env file.", + epilog="For more information, visit https://cdot65.github.io/pan-os-upgrade.", ) - parser.add_argument( + + # Grouping authentication arguments + auth_group = parser.add_argument_group("Authentication") + auth_group.add_argument( + "--hostname", + dest="hostname", + type=str, + default=None, + help="Hostname of the PAN-OS appliance", + ) + auth_group.add_argument( "--api-key", dest="api_key", type=str, default=None, - help="API Key for authentication", + help="API Key for authentication with the Firewall appliance.", + ) + auth_group.add_argument( + "--username", + dest="username", + type=str, + help="Username for authentication with the Firewall appliance.", + ) + auth_group.add_argument( + "--password", + dest="password", + type=str, + help="Password for authentication.", ) + + # Other arguments parser.add_argument( "--dry-run", dest="dry_run", action="store_true", default=os.getenv("DRY_RUN", "False").lower() == "true", - help="Dry run of the upgrade process", - ) - parser.add_argument( - "--hostname", - dest="hostname", - type=str, - default=None, - help="Hostname of the PAN-OS appliance", + help="Perform a dry run of all tests and downloads without performing the actual upgrade.", ) parser.add_argument( "--log-level", @@ -320,20 +340,6 @@ def parse_arguments() -> Args: default=os.getenv("LOG_LEVEL", "info"), help="Set the logging output level", ) - parser.add_argument( - "--password", - dest="password", - type=str, - default=None, - help="Password for authentication", - ) - parser.add_argument( - "--username", - dest="username", - type=str, - default=None, - help="Username for authentication", - ) parser.add_argument( "--version", dest="target_version", @@ -427,12 +433,20 @@ def configure_logging(level: str) -> None: ) # Create formatters and add them to the handlers - console_format = logging.Formatter( - "%(levelname)s - %(message)s", - ) - file_format = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) + if level == "debug": + console_format = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + file_format = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + else: + console_format = logging.Formatter( + "%(levelname)s - %(message)s", + ) + file_format = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) console_handler.setFormatter(console_format) file_handler.setFormatter(file_format) @@ -527,7 +541,7 @@ def check_readiness_and_log( logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) elif test_info["log_level"] == "warning": - logging.info( + logging.debug( f"{get_emoji('report')} Skipped Readiness Check: {test_info['description']}" ) else: @@ -874,7 +888,7 @@ def software_download( ) if ha_details: logging.info( - f"{get_emoji('start')} {status_msg} - HA will sync image - Elapsed time: {elapsed_time} seconds" + f"{get_emoji('working')} {status_msg} - HA will sync image - Elapsed time: {elapsed_time} seconds" ) else: logging.info(f"{status_msg} - Elapsed time: {elapsed_time} seconds") @@ -958,7 +972,7 @@ def run_assurance( try: logging.info( - f"{get_emoji('start')} Checking if firewall is ready for upgrade..." + f"{get_emoji('start')} Executing readiness checks to determine if firewall is ready for upgrade..." ) result = checks_firewall.run_readiness_checks(actions) @@ -1021,7 +1035,7 @@ def run_assurance( # ---------------------------------------------------------------------------- def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: logging.info( - f"{get_emoji('start')} Taking a snapshot of network state information..." + f"{get_emoji('start')} Executing snapshot of network state information..." ) # take snapshots @@ -1043,7 +1057,7 @@ def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: # Check if a readiness check was successfully created if isinstance(network_snapshot, SnapshotReport): - logging.info(f"{get_emoji('report')} Network snapshot created successfully") + logging.info(f"{get_emoji('success')} Network snapshot created successfully") network_snapshot_json = network_snapshot.model_dump_json(indent=4) logging.debug(network_snapshot_json) @@ -1052,7 +1066,7 @@ def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: with open(file_path, "w") as file: file.write(network_snapshot_json) - logging.info( + logging.debug( f"{get_emoji('save')} Network state snapshot collected from {hostname}, saved to {file_path}" ) else: @@ -1063,8 +1077,8 @@ def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: # Perform the readiness checks # ---------------------------------------------------------------------------- def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) -> None: - logging.info( - f"{get_emoji('start')} Performing readiness checks of target firewall..." + logging.debug( + f"{get_emoji('start')} Executing readiness checks of target firewall..." ) readiness_check = run_assurance( @@ -1097,7 +1111,7 @@ def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) with open(file_path, "w") as file: file.write(readiness_check_report_json) - logging.info( + logging.debug( f"{get_emoji('save')} Readiness checks completed for {hostname}, saved to {file_path}" ) else: @@ -1160,7 +1174,7 @@ def backup_configuration( with open(file_path, "w") as file: file.write(config_str) - logging.info( + logging.debug( f"{get_emoji('save')} Configuration backed up successfully to {file_path}" ) return True @@ -1202,12 +1216,12 @@ def main() -> None: configure_logging(args["log_level"]) # Create our connection to the firewall - logging.debug("Connecting to PAN-OS firewall...") + logging.debug(f"{get_emoji('start')} Connecting to PAN-OS firewall...") firewall = connect_to_firewall(args) - logging.info(f"{get_emoji('start')} Connection established") + logging.info(f"{get_emoji('success')} Connection to firewall established") # Refresh system information to ensure we have the latest data - logging.debug("Refreshing system information...") + logging.debug(f"{get_emoji('start')} Refreshing system information...") firewall_details = SystemSettings.refreshall(firewall)[0] logging.info( f"{get_emoji('report')} {firewall.serial} {firewall_details.hostname} {firewall_details.ip_address}" @@ -1215,14 +1229,16 @@ def main() -> None: # Determine if the firewall is standalone, HA, or in a cluster logging.debug( - f"{get_emoji('start')} Checking if firewall is standalone, HA, or in a cluster..." + f"{get_emoji('start')} Executing test to see if firewall is standalone, HA, or in a cluster..." ) deploy_info, ha_details = get_ha_status(firewall) logging.info(f"{get_emoji('report')} Firewall HA mode: {deploy_info}") logging.debug(f"{get_emoji('report')} Firewall HA details: {ha_details}") # Check to see if the firewall is ready for an upgrade - logging.debug(f"{get_emoji('start')} Checking firewall readiness...") + logging.debug( + f"{get_emoji('start')} Executing test to validate firewall's readiness..." + ) update_available = software_update_check( firewall, args["target_version"], ha_details ) @@ -1237,7 +1253,7 @@ def main() -> None: # Download the target PAN-OS version logging.info( - f"{get_emoji('start')} Checking if {args['target_version']} is downloaded..." + f"{get_emoji('start')} Executing test to see if {args['target_version']} is already downloaded..." ) image_downloaded = software_download(firewall, args["target_version"], ha_details) if deploy_info == "active" or deploy_info == "passive": @@ -1255,9 +1271,6 @@ def main() -> None: sys.exit(1) # Execute the pre-upgrade snapshot - logging.info( - f"{get_emoji('start')} Taking a pre-upgrade snapshot of network state information..." - ) perform_snapshot( firewall, firewall_details.hostname, @@ -1265,9 +1278,6 @@ def main() -> None: ) # Execute Readiness Checks - logging.info( - f"{get_emoji('start')} Checking device to see if its ready for an upgrade..." - ) perform_readiness_checks( firewall, firewall_details.hostname, @@ -1277,18 +1287,20 @@ def main() -> None: # If the firewall is in an HA pair, check the HA peer to ensure sync has been enabled if ha_details: logging.info( - f"{get_emoji('start')} Checking HA peer to ensure the two are in sync..." + f"{get_emoji('start')} Executing test to see if HA peer is in sync..." ) if ha_details["response"]["result"]["group"]["running-sync"] == "synchronized": - logging.info(f"{get_emoji('success')} HA peer sync has been completed") + logging.info(f"{get_emoji('success')} HA peer sync test has been completed") else: - logging.error(f"{get_emoji('error')} HA peer state is not in sync") + logging.error( + f"{get_emoji('error')} HA peer state is not in sync, please try again" + ) logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) # Back up configuration to local filesystem logging.info( - f"{get_emoji('start')} Backing up configuration to local filesystem..." + f"{get_emoji('start')} Executing backup of {firewall_details.hostname}'s configuration to local filesystem..." ) backup_config = backup_configuration( firewall, From 705ef3a1c778d30d1a4ec5a2f860dfa2595c65a1 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Wed, 17 Jan 2024 06:07:54 -0600 Subject: [PATCH 5/7] Add flake8@7.0.0 to enabled linters --- .trunk/trunk.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.trunk/trunk.yaml b/.trunk/trunk.yaml index 4f396fd..6f8c7c0 100644 --- a/.trunk/trunk.yaml +++ b/.trunk/trunk.yaml @@ -19,6 +19,7 @@ lint: disabled: - isort enabled: + - flake8@7.0.0 - bandit@1.7.6 - black@23.12.1 - dotenv-linter@3.3.0 From bc4030f70ccabd49aa2b62cd12acd2e6264c51a3 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Wed, 17 Jan 2024 07:43:54 -0600 Subject: [PATCH 6/7] update docstrings --- upgrade.py | 145 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 127 insertions(+), 18 deletions(-) diff --git a/upgrade.py b/upgrade.py index 4daf4c0..076a09d 100644 --- a/upgrade.py +++ b/upgrade.py @@ -455,7 +455,35 @@ def configure_logging(level: str) -> None: logger.addHandler(file_handler) -def get_emoji(action): +def get_emoji(action: str) -> str: + """ + Retrieve the corresponding emoji for a given action. + + Maps a specified action string to its corresponding emoji character based on a predefined set + of mappings. If the action does not have an associated emoji in the map, an empty string is returned. + + Parameters + ---------- + action : str + The action keyword for which the corresponding emoji is desired. Possible actions include + 'success', 'warning', 'error', 'working', 'report', 'search', 'save', 'stop', and 'start'. + + Returns + ------- + str + The emoji character associated with the given action, or an empty string if the action is not recognized. + + Examples + -------- + >>> get_emoji('success') + '✅' + + >>> get_emoji('error') + '❌' + + >>> get_emoji('unknown') + '' + """ emoji_map = { "success": "✅", "warning": "⚠️", @@ -525,6 +553,31 @@ def check_readiness_and_log( test_name: str, test_info: dict, ): + """ + Check readiness test results and log the outcome. + + This function evaluates the outcome of a specific readiness test based on its result. + It logs the test outcome using different log levels (info, warning, error) depending + on the test's importance and its result. In cases where the test result is critical and + the test fails, the script may exit. + + Parameters + ---------- + result : dict + A dictionary containing the results of readiness tests. Each key is a test name, and + the value is another dictionary with 'state' and 'reason' keys. + test_name : str + The name of the test to check the result for. + test_info : dict + A dictionary containing information about the test, including a description, the log level, + and whether the script should exit on failure. + + Notes + ----- + The function uses the `get_emoji` helper to add relevant emojis to the log messages for + better visual distinction of the test outcomes. The test result is considered a pass if + the 'state' is True; otherwise, it's a fail or skip, depending on the 'log_level'. + """ test_result = result.get( test_name, {"state": False, "reason": "Test not performed"} ) @@ -1033,7 +1086,35 @@ def run_assurance( # ---------------------------------------------------------------------------- # Perform the snapshot of the network state # ---------------------------------------------------------------------------- -def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: +def perform_snapshot( + firewall: Firewall, + hostname: str, + file_path: str, +) -> None: + """ + Perform a snapshot of the network state on the specified firewall and save it to a file. + + Executes a series of network state information collections (such as arp_table, content_version, + ip_sec_tunnels, etc.) on the provided firewall instance. The collected data is then saved as a + JSON file to the specified file path. Logs the start, success, or failure of the snapshot operation. + + Parameters + ---------- + firewall : Firewall + An instance of the Firewall class representing the firewall to collect the network state from. + hostname : str + The hostname of the firewall. Used for logging purposes. + file_path : str + The file path where the network state snapshot JSON will be saved. + + Notes + ----- + - The function leverages the `run_assurance` function for collecting network state information. + - If the snapshot is successful, the JSON representation of the network state is saved to the specified file. + - If the snapshot fails, an error is logged. + - This function also ensures that the directory for the file path exists before saving. + """ + logging.info( f"{get_emoji('start')} Executing snapshot of network state information..." ) @@ -1076,7 +1157,36 @@ def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: # ---------------------------------------------------------------------------- # Perform the readiness checks # ---------------------------------------------------------------------------- -def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) -> None: +def perform_readiness_checks( + firewall: Firewall, + hostname: str, + file_path: str, +) -> None: + """ + Execute and handle the results of readiness checks on a specified firewall. + + Initiates various readiness checks on the target firewall to determine its state and suitability + for further operations, such as upgrades. The checks include candidate config, content version, + expired licenses, HA status, job status, disk space, NTP synchronization, Panorama connection, + and planes clock synchronization. The results are logged, and a detailed report is saved to the + specified file path. + + Parameters + ---------- + firewall : Firewall + An instance of the Firewall class representing the firewall to perform readiness checks on. + hostname : str + The hostname of the firewall on which the readiness checks are performed. + file_path : str + The file path where the readiness check report will be saved. + + Notes + ----- + - If the readiness checks are successful, a ReadinessCheckReport is generated, logged, and saved. + - If the readiness checks fail, an error message is logged. + - The function uses emojis for logging to enhance readability and user experience. + """ + logging.debug( f"{get_emoji('start')} Executing readiness checks of target firewall..." ) @@ -1191,26 +1301,25 @@ def main() -> None: """ Main function of the script, serving as the entry point. - Handles CLI arguments, configures logging, and establishes a connection to the Firewall appliance. - Performs a series of operations, including refreshing system information, checking for software updates, - downloading the target PAN-OS version, and collecting network state information for upgrade assurance. - It conducts readiness checks and takes a pre-upgrade snapshot of the network state. Logs progress and - status throughout the process. - - The function gracefully exits with an error if conditions for the upgrade are not met or if any critical - issues are encountered during execution. It enters a debug mode upon successful completion of all operations. + Orchestrates the workflow for checking and preparing a PAN-OS firewall for an upgrade. This includes: + - Parsing command-line arguments. + - Configuring logging based on specified log level. + - Connecting to the firewall and refreshing its system information. + - Determining the firewall's deployment status (standalone, HA, cluster). + - Checking firewall readiness for the target PAN-OS version and downloading it if necessary. + - Performing pre-upgrade network state snapshots and readiness checks. + - Conducting HA peer synchronization checks and backup of current configuration. - Operations: - - Connects to the Firewall and refreshes system information. - - Determines deployment status (standalone, HA, cluster). - - Assesses readiness for upgrade and downloads the target PAN-OS version. - - Collects network state information and performs readiness checks. + The function uses emojis to enhance log readability and provides detailed feedback throughout the process. + If the 'dry_run' argument is set to True, the script performs checks without initiating the actual upgrade. Raises ------ SystemExit - If the firewall is not ready for an upgrade to the target version, or - if there are other critical issues preventing the continuation of the script. + - If the firewall is not ready for an upgrade to the target version. + - If there are critical issues preventing the continuation of the script. + - If the HA peer state is not synchronized. + - Upon completion of a dry run. """ args = parse_arguments() configure_logging(args["log_level"]) From 8f65023ccb71bb1fec5d70398d34b928726cb7c8 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Thu, 18 Jan 2024 09:16:55 -0600 Subject: [PATCH 7/7] adding upgrade and reboot process --- upgrade.py | 286 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 253 insertions(+), 33 deletions(-) diff --git a/upgrade.py b/upgrade.py index 076a09d..6bade39 100644 --- a/upgrade.py +++ b/upgrade.py @@ -5,7 +5,7 @@ import sys import time from logging.handlers import RotatingFileHandler -from typing import Dict, List, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union # trunk-ignore(bandit/B405) import xml.etree.ElementTree as ET @@ -14,7 +14,12 @@ import panos from panos.base import PanDevice from panos.device import SystemSettings -from panos.errors import PanDeviceXapiError +from panos.errors import ( + PanDeviceXapiError, + PanXapiError, + PanConnectionTimeout, + PanURLError, +) from panos.firewall import Firewall # Palo Alto Networks panos-upgrade-assurance imports @@ -107,7 +112,7 @@ class AssuranceOptions: "free_disk_space": { "description": "Check if a there is enough space on the `/opt/panrepo` volume for downloading an PanOS image.", "log_level": "error", - "exit_on_failure": True, + "exit_on_failure": False, }, "ha": { "description": "Checks HA pair status from the perspective of the current device", @@ -370,6 +375,7 @@ def parse_arguments() -> Args: logging.error( f"{get_emoji('error')} Hostname must be provided as a --hostname argument or in .env", ) + sys.exit(1) # Check for missing target version @@ -378,6 +384,7 @@ def parse_arguments() -> Args: f"{get_emoji('error')} Target version must be provided as a --version argument or in .env", ) logging.error(f"{get_emoji('stop')} Halting script.") + sys.exit(1) # Ensuring mutual exclusivity @@ -389,6 +396,7 @@ def parse_arguments() -> Args: file=sys.stderr, ) logging.error(f"{get_emoji('stop')} Halting script.") + sys.exit(1) return arguments @@ -592,6 +600,7 @@ def check_readiness_and_log( logging.error(f"{get_emoji('error')} {log_message}") if test_info["exit_on_failure"]: logging.error(f"{get_emoji('stop')} Halting script.") + sys.exit(1) elif test_info["log_level"] == "warning": logging.debug( @@ -632,27 +641,39 @@ def connect_to_firewall(args: dict) -> Firewall: SystemExit If the target device is a Panorama appliance or if the required credentials are not provided. """ - # Conditional connection logic - if args["api_key"]: - target_device = PanDevice.create_from_device( - args["hostname"], - api_key=args["api_key"], - ) - else: - target_device = PanDevice.create_from_device( - args["hostname"], - args["pan_username"], - args["pan_password"], - ) + try: + # Build a connection using either an API key or username/password combination + if args["api_key"]: + target_device = PanDevice.create_from_device( + args["hostname"], + api_key=args["api_key"], + ) + else: + target_device = PanDevice.create_from_device( + args["hostname"], + args["pan_username"], + args["pan_password"], + ) - if isinstance(target_device, panos.panorama.Panorama): + if isinstance(target_device, panos.panorama.Panorama): + logging.error( + f"{get_emoji('error')} You are targeting a Panorama appliance, please target a firewall." + ) + sys.exit(1) + + return target_device + + except PanConnectionTimeout: logging.error( - f"{get_emoji('error')} You are targeting a Panorama appliance, please target a firewall." + f"{get_emoji('error')} Connection to the firewall timed out. Please check the hostname and network connectivity." ) - logging.error(f"{get_emoji('stop')} Halting script.") sys.exit(1) - return target_device + except Exception as e: + logging.error( + f"{get_emoji('error')} An error occurred while connecting to the firewall: {e}" + ) + sys.exit(1) # ---------------------------------------------------------------------------- @@ -734,6 +755,7 @@ def parse_version(version: str) -> Tuple[int, int, int, int]: f"{get_emoji('error')} Upgrade is not required or a downgrade was attempted." ) logging.error(f"{get_emoji('stop')} Halting script.") + sys.exit(1) @@ -920,6 +942,7 @@ def software_download( firewall.software.download(target_version) except PanDeviceXapiError as download_error: logging.error(f"{get_emoji('error')} {download_error}") + sys.exit(1) while True: @@ -955,6 +978,7 @@ def software_download( else: logging.error(f"{get_emoji('error')} Error downloading {target_version}.") + sys.exit(1) @@ -969,7 +993,7 @@ def run_assurance( config: Dict[str, Union[str, int, float, bool]], ) -> Union[SnapshotReport, ReadinessCheckReport, None]: """ - Execute operational tasks on the Firewall and return the results or generate reports. + Perform operational tasks on the Firewall and return the results or generate reports. Handles various operational tasks on the Firewall based on 'operation_type', such as performing readiness checks, capturing state snapshots, or generating reports. The function @@ -984,7 +1008,7 @@ def run_assurance( hostname : str Hostname of the firewall. operation_type : str - Type of operation to be executed, e.g., 'readiness_check', 'state_snapshot', 'report'. + Type of operation to be performed, e.g., 'readiness_check', 'state_snapshot', 'report'. actions : List[str] List of specific actions to be performed within the operation type. config : Dict[str, Union[str, int, float, bool]] @@ -1021,11 +1045,12 @@ def run_assurance( logging.error( f"{get_emoji('error')} Invalid action for readiness check: {action}" ) + sys.exit(1) try: logging.info( - f"{get_emoji('start')} Executing readiness checks to determine if firewall is ready for upgrade..." + f"{get_emoji('start')} Performing readiness checks to determine if firewall is ready for upgrade..." ) result = checks_firewall.run_readiness_checks(actions) @@ -1094,7 +1119,7 @@ def perform_snapshot( """ Perform a snapshot of the network state on the specified firewall and save it to a file. - Executes a series of network state information collections (such as arp_table, content_version, + Performs a series of network state information collections (such as arp_table, content_version, ip_sec_tunnels, etc.) on the provided firewall instance. The collected data is then saved as a JSON file to the specified file path. Logs the start, success, or failure of the snapshot operation. @@ -1116,7 +1141,7 @@ def perform_snapshot( """ logging.info( - f"{get_emoji('start')} Executing snapshot of network state information..." + f"{get_emoji('start')} Performing snapshot of network state information..." ) # take snapshots @@ -1163,7 +1188,7 @@ def perform_readiness_checks( file_path: str, ) -> None: """ - Execute and handle the results of readiness checks on a specified firewall. + Perform and handle the results of readiness checks on a specified firewall. Initiates various readiness checks on the target firewall to determine its state and suitability for further operations, such as upgrades. The checks include candidate config, content version, @@ -1188,7 +1213,7 @@ def perform_readiness_checks( """ logging.debug( - f"{get_emoji('start')} Executing readiness checks of target firewall..." + f"{get_emoji('start')} Performing readiness checks of target firewall..." ) readiness_check = run_assurance( @@ -1294,6 +1319,187 @@ def backup_configuration( return False +# ---------------------------------------------------------------------------- +# Perform the upgrade process +# ---------------------------------------------------------------------------- +def perform_upgrade( + firewall: Firewall, + hostname: str, + target_version: str, + ha_details: Optional[dict] = None, +) -> None: + """ + Perform the upgrade process for the specified firewall. + + Performs the upgrade of the firewall to the target PAN-OS version. This includes initiating the + upgrade, handling High Availability (HA) considerations if applicable, and rebooting the firewall. + The function logs each step and handles any errors that occur during the process. + + Parameters + ---------- + firewall : Firewall + An instance of the Firewall class representing the firewall to be upgraded. + hostname : str + The hostname of the firewall. Used for logging purposes. + target_version : str + The target PAN-OS version to upgrade the firewall to. + ha_details : Optional[dict] + High Availability details of the firewall, if applicable. + + Raises + ------ + SystemExit + - If the upgrade job fails. + - If HA synchronization fails or times out. + - If the firewall reboot process fails or times out. + """ + + logging.info( + f"{get_emoji('start')} Performing upgrade to version {target_version}..." + ) + + try: + install_job = firewall.software.install(target_version, sync=True) + if install_job["success"]: + logging.info(f"{get_emoji('success')} Upgrade job completed successfully") + logging.debug(f"{get_emoji('report')} {install_job}") + else: + logging.error(f"{get_emoji('error')} Upgrade job failed") + + sys.exit(1) + + # Define timeout and start time + timeout = 300 # 5 minutes in seconds + ha_suspend_start_time = time.time() + + # First, check if ha_details exists + if ha_details: + while True: + try: + # Check if HA is enabled and synced + if ha_details["response"]["result"]["enabled"] == "yes": + logging.info( + f"{get_emoji('success')} HA peer sync test has been completed" + ) + logging.info( + f"{get_emoji('start')} Suspending HA state of firewall..." + ) + suspend_job = firewall.op( + "", + cmd_xml=False, + ) + suspend_job_result = xml_to_dict(suspend_job) + logging.info( + f"{get_emoji('report')} {suspend_job_result['response']['result']}" + ) + break # Exit the loop as the condition is met + else: + # If HA is enabled but not synced + current_time = time.time() + if current_time - ha_suspend_start_time > timeout: + logging.error( + f"{get_emoji('error')} Timeout reached while waiting for HA sync" + ) + break # Exit the loop after timeout + + logging.info( + f"{get_emoji('warning')} HA peer sync test was not successful, trying again in ten seconds..." + ) + time.sleep(10) + # Re-fetch the HA details here if necessary, to check the current status + + except KeyError: + # KeyError handling if 'enabled' key is not found + logging.error( + f"{get_emoji('error')} KeyError: Problem accessing HA details" + ) + break + + else: + logging.info( + f"{get_emoji('report')} Firewall is not in an HA pair, continuing in standalone mode..." + ) + + except PanDeviceXapiError as upgrade_error: + logging.error(f"{get_emoji('error')} {upgrade_error}") + + sys.exit(1) + + +# ---------------------------------------------------------------------------- +# Perform the reboot process +# ---------------------------------------------------------------------------- +def perform_reboot(firewall: Firewall, ha_details: Optional[dict] = None) -> None: + """ + Perform the reboot process for the specified firewall. + + Initiates the reboot of the firewall and waits until it comes back online. If the firewall is part + of a High Availability (HA) setup, it also checks for synchronization with its HA peer after rebooting. + The function logs each step and handles any errors that occur during the process. + + Parameters + ---------- + firewall : Firewall + An instance of the Firewall class representing the firewall to be rebooted. + ha_details : Optional[dict] + High Availability details of the firewall, if applicable. + + Raises + ------ + SystemExit + - If the reboot process fails or times out. + """ + + reboot_start_time = time.time() + rebooted = False + + logging.info(f"{get_emoji('start')} Rebooting the firewall...") + reboot_job = firewall.op( + "", cmd_xml=False + ) + reboot_job_result = xml_to_dict(reboot_job) + logging.info(f"{get_emoji('report')} {reboot_job_result['response']['result']}") + + while not rebooted: + try: + deploy_info, current_ha_details = get_ha_status(firewall) + if current_ha_details and deploy_info in ["active", "passive"]: + if ( + current_ha_details["response"]["result"]["group"]["running-sync"] + == "synchronized" + ): + logging.info( + f"{get_emoji('success')} Firewall rebooted and synchronized with its HA peer in {int(time.time() - reboot_start_time)} seconds" + ) + rebooted = True + else: + logging.info( + f"{get_emoji('working')} Firewall rebooted but not yet synchronized with its peer. Will try again in 30 seconds." + ) + time.sleep(30) + elif current_ha_details and deploy_info == "disabled": + logging.info( + f"{get_emoji('success')} Firewall rebooted in {int(time.time() - reboot_start_time)} seconds" + ) + rebooted = True + else: + logging.info( + f"{get_emoji('working')} Waiting for firewall to reboot and synchronize..." + ) + time.sleep(30) + + except (PanXapiError, PanConnectionTimeout, PanURLError): + logging.info(f"{get_emoji('working')} Firewall is rebooting...") + time.sleep(30) + + # Check if 20 minutes have passed + if time.time() - reboot_start_time > 1200: # 20 minutes in seconds + logging.error( + f"{get_emoji('error')} Firewall did not become available and/or establish a Connected sync state with its HA peer after 20 minutes. Please check the firewall status manually." + ) + break + + # ---------------------------------------------------------------------------- # Primary execution of the script # ---------------------------------------------------------------------------- @@ -1338,7 +1544,7 @@ def main() -> None: # Determine if the firewall is standalone, HA, or in a cluster logging.debug( - f"{get_emoji('start')} Executing test to see if firewall is standalone, HA, or in a cluster..." + f"{get_emoji('start')} Performing test to see if firewall is standalone, HA, or in a cluster..." ) deploy_info, ha_details = get_ha_status(firewall) logging.info(f"{get_emoji('report')} Firewall HA mode: {deploy_info}") @@ -1346,7 +1552,7 @@ def main() -> None: # Check to see if the firewall is ready for an upgrade logging.debug( - f"{get_emoji('start')} Executing test to validate firewall's readiness..." + f"{get_emoji('start')} Performing test to validate firewall's readiness..." ) update_available = software_update_check( firewall, args["target_version"], ha_details @@ -1358,11 +1564,12 @@ def main() -> None: logging.error( f"{get_emoji('error')} Firewall is not ready for upgrade to {args['target_version']}.", ) + sys.exit(1) # Download the target PAN-OS version logging.info( - f"{get_emoji('start')} Executing test to see if {args['target_version']} is already downloaded..." + f"{get_emoji('start')} Performing test to see if {args['target_version']} is already downloaded..." ) image_downloaded = software_download(firewall, args["target_version"], ha_details) if deploy_info == "active" or deploy_info == "passive": @@ -1377,16 +1584,17 @@ def main() -> None: # Begin snapshots of the network state if not image_downloaded: logging.error(f"{get_emoji('error')} Image not downloaded, exiting...") + sys.exit(1) - # Execute the pre-upgrade snapshot + # Perform the pre-upgrade snapshot perform_snapshot( firewall, firewall_details.hostname, f'assurance/snapshots/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json', ) - # Execute Readiness Checks + # Perform Readiness Checks perform_readiness_checks( firewall, firewall_details.hostname, @@ -1396,7 +1604,7 @@ def main() -> None: # If the firewall is in an HA pair, check the HA peer to ensure sync has been enabled if ha_details: logging.info( - f"{get_emoji('start')} Executing test to see if HA peer is in sync..." + f"{get_emoji('start')} Performing test to see if HA peer is in sync..." ) if ha_details["response"]["result"]["group"]["running-sync"] == "synchronized": logging.info(f"{get_emoji('success')} HA peer sync test has been completed") @@ -1405,11 +1613,12 @@ def main() -> None: f"{get_emoji('error')} HA peer state is not in sync, please try again" ) logging.error(f"{get_emoji('stop')} Halting script.") + sys.exit(1) # Back up configuration to local filesystem logging.info( - f"{get_emoji('start')} Executing backup of {firewall_details.hostname}'s configuration to local filesystem..." + f"{get_emoji('start')} Performing backup of {firewall_details.hostname}'s configuration to local filesystem..." ) backup_config = backup_configuration( firewall, @@ -1425,6 +1634,17 @@ def main() -> None: else: logging.info(f"{get_emoji('start')} Not a dry run, continue with upgrade...") + # Perform the upgrade + perform_upgrade( + firewall=firewall, + hostname=firewall_details.hostname, + target_version=args["target_version"], + ha_details=ha_details, + ) + + # Perform the reboot + perform_reboot(firewall=firewall, ha_details=ha_details) + if __name__ == "__main__": main()