From e646318ea7c4af8015a28e99e5c4ffeaa6601321 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Tue, 16 Jan 2024 07:04:20 -0600 Subject: [PATCH] add Readiness Checks and Snapshots to workflow --- upgrade.py | 296 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 186 insertions(+), 110 deletions(-) diff --git a/upgrade.py b/upgrade.py index 57e619b..23366ac 100644 --- a/upgrade.py +++ b/upgrade.py @@ -24,7 +24,7 @@ from pydantic import BaseModel # project imports -from models import AssuranceReport +from models import SnapshotReport, ReadinessCheckReport # ---------------------------------------------------------------------------- @@ -52,82 +52,118 @@ class AssuranceOptions: Attributes ---------- - READINESS_CHECKS : list of str - A list of readiness checks to be performed on the PAN-OS appliance. These checks assess system and - network parameters like active support status, ARP entry existence, candidate configuration, etc. - - STATE_SNAPSHOTS : list of str - A list of state snapshot types to capture from the PAN-OS appliance. These snapshots record critical - data regarding the appliance's current state, such as ARP table, content version, IPsec tunnels, etc. + READINESS_CHECKS : dict + A dictionary mapping each readiness check to its description, log level, and whether to exit on failure. + This provides a more detailed context for each check, allowing for tailored logging and error handling. REPORTS : list of str A list of report types that can be generated for the PAN-OS appliance. These reports provide detailed information on various aspects of the appliance, including ARP table, content version, IPsec tunnels, license details, and more. - READINESS_CHECK_DESCRIPTIONS : dict - A dictionary mapping each readiness check to its description, log level, and whether to exit on failure. - This provides a more detailed context for each check, allowing for tailored logging and error handling. + STATE_SNAPSHOTS : list of str + A list of state snapshot types to capture from the PAN-OS appliance. These snapshots record critical + data regarding the appliance's current state, such as ARP table, content version, IPsec tunnels, etc. """ - READINESS_CHECKS = [ - "active_support", - "arp_entry_exist", - "candidate_config", - "content_version", - "free_disk_space", - "expired_licenses", - "ha", - "ip_sec_tunnel_status", - "ntp_sync", - "panorama", - "planes_clock_sync", - "session_exist", - ] - - STATE_SNAPSHOTS = [ - "arp_table", - "content_version", - "ip_sec_tunnels", - "license", - "nics", - "routes", - "session_stats", - ] - - REPORTS = [ - "arp_table", - "content_version", - "ip_sec_tunnels", - "license", - "nics", - "routes", - "session_stats", - ] - - READINESS_CHECK_DESCRIPTIONS = { + READINESS_CHECKS = { + "active_support": { + "description": "Check if active support is available", + "log_level": "warning", + "exit_on_failure": False, + }, + "arp_entry_exist": { + "description": "Check if a given ARP entry is available in the ARP table", + "log_level": "warning", + "exit_on_failure": False, + }, "candidate_config": { - "description": "No Pending Configuration Changes", + "description": "Check if there are pending changes on device", "log_level": "error", "exit_on_failure": True, }, + "certificates_requirements": { + "description": "Check if the certificates' keys meet minimum size requirements", + "log_level": "warning", + "exit_on_failure": False, + }, "content_version": { "description": "Running Latest Content Version", "log_level": "warning", "exit_on_failure": False, }, + "dynamic_updates": { + "description": "Check if any Dynamic Update job is scheduled to run within the specified time window", + "log_level": "warning", + "exit_on_failure": False, + }, "expired_licenses": { "description": "No Expired Licenses", "log_level": "warning", "exit_on_failure": False, }, + "free_disk_space": { + "description": "Check if a there is enough space on the `/opt/panrepo` volume for downloading an PanOS image.", + "log_level": "error", + "exit_on_failure": True, + }, "ha": { - "description": "HA Status", + "description": "Checks HA pair status from the perspective of the current device", "log_level": "info", "exit_on_failure": False, }, + "ip_sec_tunnel_status": { + "description": "Check if a given IPsec tunnel is in active state", + "log_level": "warning", + "exit_on_failure": False, + }, + "jobs": { + "description": "Check for any job with status different than FIN", + "log_level": "warning", + "exit_on_failure": False, + }, + "ntp_sync": { + "description": "Check if NTP is synchronized", + "log_level": "warning", + "exit_on_failure": False, + }, + "planes_clock_sync": { + "description": "Check if the clock is synchronized between dataplane and management plane", + "log_level": "warning", + "exit_on_failure": False, + }, + "panorama": { + "description": "Check connectivity with the Panorama appliance", + "log_level": "warning", + "exit_on_failure": False, + }, + # "session_exist": { + # "description": "Check if a critical session is present in the sessions table", + # "log_level": "error", + # "exit_on_failure": True, + # }, } + REPORTS = [ + "arp_table", + "content_version", + "ip_sec_tunnels", + "license", + "nics", + "routes", + "session_stats", + ] + + STATE_SNAPSHOTS = [ + "arp_table", + "content_version", + "ip_sec_tunnels", + "license", + "nics", + "routes", + "session_stats", + ] + # ---------------------------------------------------------------------------- # Define models @@ -454,9 +490,9 @@ def check_readiness_and_log( if test_info["exit_on_failure"]: sys.exit(1) elif test_info["log_level"] == "warning": - logging.warning(f"{log_message}") + logging.info(f"Skipped Readiness Check: {test_info['description']}") else: - logging.info(log_message) + logging.debug(log_message) # ---------------------------------------------------------------------------- @@ -805,14 +841,14 @@ def run_assurance( operation_type: str, actions: List[str], config: Dict[str, Union[str, int, float, bool]], -) -> Union[AssuranceReport, None]: +) -> Union[SnapshotReport, ReadinessCheckReport, None]: """ Execute operational tasks on the Firewall and return the results or generate reports. Handles various operational tasks on the Firewall based on 'operation_type', such as performing readiness checks, capturing state snapshots, or generating reports. The function operates according to the specified 'actions' and 'config'. If the operation is successful, - it returns the results or an AssuranceReport object. If an invalid operation type or action + it returns the results or an SnapshotReport object. If an invalid operation type or action is specified, or an error occurs, the function logs an error and returns None. Parameters @@ -830,8 +866,8 @@ def run_assurance( Returns ------- - Union[AssuranceReport, None] - The results of the operation as an AssuranceReport object, or None if an invalid + Union[SnapshotReport, None] + The results of the operation as an SnapshotReport object, or None if an invalid operation type or action is specified, or if an error occurs. Raises @@ -855,7 +891,7 @@ def run_assurance( if operation_type == "readiness_check": for action in actions: - if action not in AssuranceOptions.READINESS_CHECKS: + if action not in AssuranceOptions.READINESS_CHECKS.keys(): logging.error(f"Invalid action for readiness check: {action}") sys.exit(1) @@ -866,12 +902,14 @@ def run_assurance( for ( test_name, test_info, - ) in AssuranceOptions.READINESS_CHECK_DESCRIPTIONS.items(): + ) in AssuranceOptions.READINESS_CHECKS.items(): check_readiness_and_log(result, test_name, test_info) + return ReadinessCheckReport(**result) + except Exception as e: logging.error(f"Error running readiness checks: {e}") - return + return None elif operation_type == "state_snapshot": # validate each type of action @@ -887,8 +925,8 @@ def run_assurance( logging.debug(results) if results: - # Pass the results to the AssuranceReport model - return AssuranceReport(hostname=hostname, **results) + # Pass the results to the SnapshotReport model + return SnapshotReport(hostname=hostname, **results) else: return None @@ -911,6 +949,88 @@ def run_assurance( return results +# ---------------------------------------------------------------------------- +# Perform the snapshot of the network state +# ---------------------------------------------------------------------------- +def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: + logging.info("Taking a snapshot of network state information...") + + # take snapshots + network_snapshot = run_assurance( + firewall, + hostname, + operation_type="state_snapshot", + actions=[ + "arp_table", + "content_version", + "ip_sec_tunnels", + "license", + "nics", + "routes", + "session_stats", + ], + config={}, + ) + + # Check if a readiness check was successfully created + if isinstance(network_snapshot, SnapshotReport): + logging.info("Network snapshot created successfully") + network_snapshot_json = network_snapshot.model_dump_json(indent=4) + logging.debug(network_snapshot_json) + + ensure_directory_exists(file_path) + + with open(file_path, "w") as file: + file.write(network_snapshot_json) + + logging.info( + f"Network state snapshot collected from {hostname}, saved to {file_path}" + ) + else: + logging.error("Failed to create snapshot") + + +# ---------------------------------------------------------------------------- +# Perform the readiness checks +# ---------------------------------------------------------------------------- +def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) -> None: + logging.info("Performing readiness checks of target firewall...") + + readiness_check = run_assurance( + firewall, + hostname, + operation_type="readiness_check", + actions=[ + "candidate_config", + "content_version", + "expired_licenses", + "ha", + "jobs", + "free_disk_space", + "ntp_sync", + "panorama", + "planes_clock_sync", + ], + config={}, + ) + + # Check if a readiness check was successfully created + if isinstance(readiness_check, ReadinessCheckReport): + # Do something with the readiness check report, e.g., log it, save it, etc. + logging.info("Readiness Checks completed") + readiness_check_report_json = readiness_check.model_dump_json(indent=4) + logging.debug(readiness_check_report_json) + + ensure_directory_exists(file_path) + + with open(file_path, "w") as file: + file.write(readiness_check_report_json) + + logging.info(f"Readiness checks completed for {hostname}, saved to {file_path}") + else: + logging.error("Failed to create readiness check") + + # ---------------------------------------------------------------------------- # Primary execution of the script # ---------------------------------------------------------------------------- @@ -982,70 +1102,26 @@ def main() -> None: f"{args['target_version']} has been downloaded and sync'd to HA peer." ) else: - logging.info(f"{args['target_version']} has been downloaded.") + logging.info(f"PAN-OS version {args['target_version']} has been downloaded.") - # Begin Snapshotting the network state + # Begin snapshots of the network state if image_downloaded: # Execute the pre-upgrade snapshot logging.info("Taking a pre-upgrade snapshot of network state information...") - assurance_report = run_assurance( + perform_snapshot( firewall, firewall_details.hostname, - operation_type="state_snapshot", - actions=[ - "arp_table", - "content_version", - "ip_sec_tunnels", - "license", - "nics", - "routes", - "session_stats", - ], - config={}, - ) - - # Check if an assurance report was successfully created - if assurance_report: - # Do something with the assurance report, e.g., log it, save it, etc. - logging.info("Assurance Report created successfully") - assurance_report_json = assurance_report.model_dump_json(indent=4) - logging.debug(assurance_report_json) - - # Ensure directory exists before writing the file - file_path = f'assurance/snapshots/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json' - ensure_directory_exists(file_path) - - with open(file_path, "w") as file: - file.write(assurance_report_json) - - else: - logging.error("Failed to create Assurance Report") - - logging.info( - f"Pre-Upgrade network state snapshot collected from {firewall_details.hostname}, saved to {file_path}" + f'assurance/snapshots/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json', ) # Execute Readiness Checks logging.info("Checking device to see if its ready for an upgrade...") - readiness_check = run_assurance( + perform_readiness_checks( firewall, firewall_details.hostname, - operation_type="readiness_check", - actions=[ - "candidate_config", - "content_version", - "expired_licenses", - "ha", - ], - config={}, + f'assurance/readiness_checks/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json', ) - # Check if an readiness check was successfully created - if readiness_check: - # Do something with the readiness check, e.g., log it, save it, etc. - logging.info("Readiness Checks completed") - logging.info(readiness_check) - if __name__ == "__main__": main()